admin管理员组

文章数量:1295903

All I was trying to do was built a basic multithreading application where read and write request are sent in random from the main thread, the read and write thread will pick each of the request and process them accordingly.

I was able to get my code to run but when I don't put the sleep for 500ms after the end of the switch in the process_result, the request even though generated and processed by the main thread in random will execute read, read, read and then write, write, write or vice versa

But adding that sleep fixed it. I need help to understand this behaviour. I do get that unbounded channel have no order of processing so maybe that's why it's behaving that way.

use crossbeam_channel::unbounded; // Import from crossbeam_channel
use rand::prelude::*;
use std::sync::{mpsc, Arc, Mutex};
use std::thread::scope;
use std::time::Duration;

#[derive(Debug, Default)]
enum OpType {
    #[default]
    Read,
    Write(u8),
}

impl OpType {
    fn rand_write() -> Self {
        let mut rng = rand::thread_rng();
        OpType::Write(rng.gen_range(0..100))
    }
}

pub fn test_thread3() {
    let data = Arc::new(Mutex::new(0u8));

    let (read_tx, read_rx) = unbounded::<OpType>();
    let (write_tx, write_rx) = unbounded::<OpType>();
    let (conn_tx, conn_rx) = mpsc::channel::<OpType>();

    // Read thread process
    let process_read = || {
        println!("Starting Read Thread");
        let data = data.clone();
        while let Ok(OpType::Read) = read_rx.recv_timeout(Duration::from_millis(100)) {
            if let Ok(guard) = data.lock() {
                println!("Data = {:?}", *guard);
            }
        }
    };

    //write thread process
    let process_write = || {
        let data = data.clone();
        println!("Starting Write Thread");
        while let Ok(OpType::Write(b)) = write_rx.recv_timeout(Duration::from_millis(100)) {
            if let Ok(mut guard) = data.lock() {
                println!("Writing {:?}", b);
                *guard = b;
            }
        }
    };

    // Main thread process
    let process_result = move || {
        println!("Main Thread Started");
        while let Ok(req) = conn_rx.recv_timeout(Duration::from_millis(50)) {
            match req {
                OpType::Read => {
                    let res = read_tx.send(req);
                    if res.is_err() {
                        println!("Panicked when reading");
                    }
                }
                OpType::Write(_) => {
                    let res = write_tx.send(req);
                    if res.is_err() {
                        println!("Panicked when writing");
                    }
                }
            }
        }
    };

    let mut user_request: Vec<OpType> = Vec::new();
    for _ in 0..=2 {
        user_request.push(OpType::default());
        user_request.push(OpType::rand_write());
    }
    user_request.shuffle(&mut rand::thread_rng());
    println!("{:?}", &user_request);

    for connection_request in user_request {
        conn_tx
            .send(connection_request)
            .expect("Error Sending Request To The ConnectionPool");
    }

    _ = scope(|s| {
        s.spawn(process_result);
        s.spawn(process_read);
        s.spawn(process_write);
    });
}

fn main() {
    test_thread3();
}

Example Result

The current result handle all write requests first, then read requests

expected result should be follow the order of the queue.

[Write(92), Read, Write(52), Write(47), Read, Read]
Main Thread Started
Starting Write Thread
Starting Read Thread
Writing 92
Writing 52
Writing 47
Data = 47
Data = 47
Data = 47

All I was trying to do was built a basic multithreading application where read and write request are sent in random from the main thread, the read and write thread will pick each of the request and process them accordingly.

I was able to get my code to run but when I don't put the sleep for 500ms after the end of the switch in the process_result, the request even though generated and processed by the main thread in random will execute read, read, read and then write, write, write or vice versa

But adding that sleep fixed it. I need help to understand this behaviour. I do get that unbounded channel have no order of processing so maybe that's why it's behaving that way.

use crossbeam_channel::unbounded; // Import from crossbeam_channel
use rand::prelude::*;
use std::sync::{mpsc, Arc, Mutex};
use std::thread::scope;
use std::time::Duration;

#[derive(Debug, Default)]
enum OpType {
    #[default]
    Read,
    Write(u8),
}

impl OpType {
    fn rand_write() -> Self {
        let mut rng = rand::thread_rng();
        OpType::Write(rng.gen_range(0..100))
    }
}

pub fn test_thread3() {
    let data = Arc::new(Mutex::new(0u8));

    let (read_tx, read_rx) = unbounded::<OpType>();
    let (write_tx, write_rx) = unbounded::<OpType>();
    let (conn_tx, conn_rx) = mpsc::channel::<OpType>();

    // Read thread process
    let process_read = || {
        println!("Starting Read Thread");
        let data = data.clone();
        while let Ok(OpType::Read) = read_rx.recv_timeout(Duration::from_millis(100)) {
            if let Ok(guard) = data.lock() {
                println!("Data = {:?}", *guard);
            }
        }
    };

    //write thread process
    let process_write = || {
        let data = data.clone();
        println!("Starting Write Thread");
        while let Ok(OpType::Write(b)) = write_rx.recv_timeout(Duration::from_millis(100)) {
            if let Ok(mut guard) = data.lock() {
                println!("Writing {:?}", b);
                *guard = b;
            }
        }
    };

    // Main thread process
    let process_result = move || {
        println!("Main Thread Started");
        while let Ok(req) = conn_rx.recv_timeout(Duration::from_millis(50)) {
            match req {
                OpType::Read => {
                    let res = read_tx.send(req);
                    if res.is_err() {
                        println!("Panicked when reading");
                    }
                }
                OpType::Write(_) => {
                    let res = write_tx.send(req);
                    if res.is_err() {
                        println!("Panicked when writing");
                    }
                }
            }
        }
    };

    let mut user_request: Vec<OpType> = Vec::new();
    for _ in 0..=2 {
        user_request.push(OpType::default());
        user_request.push(OpType::rand_write());
    }
    user_request.shuffle(&mut rand::thread_rng());
    println!("{:?}", &user_request);

    for connection_request in user_request {
        conn_tx
            .send(connection_request)
            .expect("Error Sending Request To The ConnectionPool");
    }

    _ = scope(|s| {
        s.spawn(process_result);
        s.spawn(process_read);
        s.spawn(process_write);
    });
}

fn main() {
    test_thread3();
}

Example Result

The current result handle all write requests first, then read requests

expected result should be follow the order of the queue.

[Write(92), Read, Write(52), Write(47), Read, Read]
Main Thread Started
Starting Write Thread
Starting Read Thread
Writing 92
Writing 52
Writing 47
Data = 47
Data = 47
Data = 47
Share Improve this question edited Feb 12 at 8:29 cafce25 27.6k5 gold badges45 silver badges56 bronze badges asked Feb 12 at 4:31 Apurba PokharelApurba Pokharel 11 silver badge 4
  • 2 <meta.stackexchange/a/388112> – dumbass Commented Feb 12 at 8:15
  • Unrelated to your issue, but note that the let data = data.clone(); lines in your threads are useless (and in fact you don't need an Arc at all, data could be a plain Mutex<u8>). – Jmb Commented Feb 12 at 15:46
  • Why is arc unnecessary? – Apurba Pokharel Commented Feb 13 at 16:08
  • Arc is unnecessary because you're using scoped threads, which can access local variables from the calling function. Arc is only required for regular threads that may live longer than the current function and so can only access 'static or owned data. – Jmb Commented Feb 14 at 13:12
Add a comment  | 

2 Answers 2

Reset to default 1

expected result should be follow the order of the queue.

Why? You're using unbounded channels, so there is a dependency from the read and write workers to the main thread but that's it, aside from that any interleaving depends on the scheduling decision of the OS. Literally the one thing the main worker does is remove any possible dependency between the read and write queues.

So the main thread can go through the entire conn queue filling the read and write queues in one shot, then either the write worker can be scheduled and process their entire queue without wait then the read worker does the same, or the reverse, or there can be interleavings.

I do get that unbounded channel have no order of processing

Channels absolutely have an order of processing, they're strict FIFO. You're the one decorrelating reads and writes by splitting the original queue into a write queue and a read queue with no synchronisation between the two.

And note that even if the read and write queues were rendezvous channels (bounded channels with size 0) you can still get "unexpeced" interleavings, because a rendezvous channel only syncs on the channel interaction, so given the queue [read, write]:

  • main thread sends a read
  • read thread retrieves read
  • read thread is suspended
  • main thread sends a write
  • write thread retrieves write
  • write thread acquires lock

is a perfectly valid interleaving, and still ends up with the write being performed before the read. It only restricts some interleavings.

First, scope spawns OS thread, so the OS will handle scheduling, and they will execute a chunk of code before interleaving task thread.

Secondly, the hold locks the mutex, which mean when another thread check for mutex, it has a high chance being locked, and will backoff and sleep. That combine with the first reason, makes the other thread continue.

think of it like this, the mutex is locked not only the lifetime of the guard, but also a few line of code after end of life, since the other thread that's waiting is probably sleeping, and the chance that it immediately wakes up after the release is low, but the current thread locks the mutex again not long after

you can see it by changing size of the request queue, the number of each Op type from 2 to 10000 and you can see that the task executes in parallel, but a chunk at a time. playground

when you add sleep in the main thread, it create a pause between each message, and force the other two thread wait for the message, and thus release the mutex.

If you want to make sure the mutex is FIFO, you might want to try parking_lot::FairMutex

本文标签: Multithreading in RustStack Overflow