Chapter 16: Fearless Concurrency


With Rust’s ownership and type systems, many concurrency bugs become compile-time errors, instead of runtime errors.

Thread

An example of thread api usage. We use the std::thread::spawn and std::thread::join similar to most languages.

fn test_multi_thread() {
    let mut handlers = vec![];
    for _ in 1..10 {
        let handler = thread::spawn(|| {
            println!("Hello world!");
            thread::sleep(Duration::from_millis(1));
        });
        handlers.push(handler);
    }

    for handler in handlers {
        handler.join().unwrap();
    }
}

We may want to access to a variable in the closure from the main thread context. If such access requires ownership, we must explicitly mark the thread body (the closure) with move keyword

fn test_move_to_thread() {
    let mut handlers = vec![];
    for i in 1..10 {
        // the move keyword will force the spawned thread to take ownership of i
        let my_struct = MyStruct { val: i };
        println!("{:?}", my_struct);
        let handler = thread::spawn(move || {
            println!("[{:?}] Hello world!", my_struct);
            thread::sleep(Duration::from_millis(1));
        });
        // This line will cause an error b/c my_struct is moved into the spawned thread at this
        // point
        // println!("{:?}", my_struct);
        handlers.push(handler);
    }

    for handler in handlers {
        handler.join().unwrap();
    }
}

Note that, since move transfer ownership from main thread to the spawned thread, we won’t be able to access the variable after creating the thread.

Message-Passing Concurrency

Rust std provides a mpsc channel:

let (tx, rv) = mpsc::Channel();

tx is for “transmitter”, and rv is for “receiver”.

We can send / receive using send and recv

fn send_recv_block() {
    let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();
    let val = 42;
    tx.send(val).unwrap();
    println!("Send: {}", val);
    let received = rx.recv().unwrap();
    println!("Recv: {}", received);
}

Sender won’t block and receiver will.

We can send in one thread and receive in another thread.

fn spsc_basic() {
    // There is no need to explicitly annotate the type in this example
    // We are doing this only for illustration purpose
    let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel();

    let handle = thread::spawn(move || {
        let val = String::from("hi");
        println!("Send: {}", val);
        tx.send(val).unwrap();
        // This line will cause error b/c the ownership of `val` is taken by `tx.send()`
        // println!("Send: {}", val);
    });

    handle.join().unwrap();

    let received = rx.recv().unwrap();
    println!("Recv: {}", received);

    println!("Done!");
}

Sender will take ownership of the sent value, so move must be used when declaring the thread’s body (a closure). Intuitively, this makes sense because if we send something to someone, we don’t hold that something anymore afterwards.

When multiple messages are sent, we can receive in a for loop

fn spsc_many_message() {
    let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel();

    let handle = thread::spawn(move || {
        let val1 = String::from("hi");
        let val2 = String::from("from");
        let val3 = String::from("thread");
        tx.send(val1).unwrap();
        tx.send(val2).unwrap();
        tx.send(val3).unwrap();
    });

    handle.join().unwrap();

    // A for loop to iterate all received messages
    for received in rx {
        println!("Recv: {}", received);
    }

    println!("Done!");
}

We can have multiple sender by cloning tx using mpsc::Sender::clone(&Self)

fn mpsc_basic() {
    let vals = [
        String::from("hi"),
        String::from("from"),
        String::from("many"),
        String::from("threads"),
    ];
    let mut handles = vec![];

    let (tx, rx) = mpsc::channel();

    for val in vals {
        let send_val = val.clone();
        let tx_clone = mpsc::Sender::clone(&tx);
        let handle = thread::spawn(move || {
            println!("Send: {}", send_val);
            tx_clone.send(send_val).unwrap();
        });
        handles.push(handle);
    }

    // If we don't drop tx, the rx will keep blocking
    // In fact, rx will keep blocking until all clones (including original) of tx are dropped

    // If we create a clone of tx b/f dropping it, rx will still keep blocking.
    // let tx_clone = tx.clone();
    // println!("{:?}", tx_clone);

    drop(tx);

    for handle in handles {
        handle.join().unwrap();
    }

    for recv_val in rx {
        println!("Recv: {}", recv_val);
    }
}

Note that the for loop over all received message won’t break until all senders are dropped in the thread that holds the receiver. If we remove the drop(tx) in the example above, the main thread will keep blocking even when all spawned threads have exited.

Shared-State Concurrency

Mutex provides to a thread the exclusive access to a piece of data. Mutex in Rust is a bit different from that in CPP: in CPP, mutex protect a piece of code by wrapping them in lock/unlock, while in Rust mutex is used to protect a specific value.

fn mutex_basic() {
    let m = Mutex::new(5);

    println!("{:?}", m);

    {
        // Return a MutexGuard<i32>, which is a smart pointer acting as a RAII guard
        // That is, the mutex will be unlocked when the MutexGuard goes out of scope,
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("{:?}", m);
}

To create a counter that is incremented by multiple threads concurrently, we need std::sync::Arc. Arc (Atomic Reference Counter) is the concurrent version of Rc: it allows atomic update of ref count.

With Arc, we can allow concurrent access to a piece of data

fn arc_basic() {
    // Arc is just like Rc, except it allows concurrent access b/c it guarantee atomic
    // increment of the internal counter
    let number = Arc::new(42);
    // Rc doesn't implement Send trait, which is required for it to be safely sent between threads
    // let number = Rc::new(42);
    let mut handlers = vec![];
    for i in 0..5 {
        let number = Arc::clone(&number);
        let handler = thread::spawn(move || {
            let number = *number;
            println!("[{}] The number is: {}", i, number);
        });
        handlers.push(handler);
    }

    for handler in handlers {
        handler.join().unwrap();
    }

    println!("[main] Number: {:?}", *number);
}

Furthermore, Mutex + Arc allows us to define the counter mentioned earlier

fn mutex_counter_arc() {
    let counter = Arc::new(Mutex::new(0));
    println!("Before: {:?}", counter);
    let mut handlers = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handler = thread::spawn(move || {
            let mut val = counter.lock().unwrap();
            *val += 1;
        });
        handlers.push(handler);
    }

    for handler in handlers {
        handler.join().unwrap();
    }

    println!("After : {:?}", counter);
}

The combination of Mutex + Arc is similar to RefCell + RC, both provides interior mutability, i.e. immutable variable that can provide a mutable reference to the underlying value.

Note that Mutex + Arc can’t prevent deadlock, just like RefCell + Rc can’t prevent from creating a cycle of reference and ref count of all var will stay equal 1 even when all refs are dropped. In fact, both are caused by some form of cyclic reference

Below is an example of a deadlock. It happens because thread 1 holds mutex on num1 and waits for mutex on num2, while thread 2 holds mutex on num2 and waits for mutex on num1

fn deadlock_example() {
    let num1 = Arc::new(Mutex::new(10));
    let num2 = Arc::new(Mutex::new(100));
    println!("Before: {:?}, {:?}", num1, num2);

    let num1_arc1 = Arc::clone(&num1);
    let num1_arc2 = Arc::clone(&num1);
    let num2_arc1 = Arc::clone(&num2);
    let num2_arc2 = Arc::clone(&num2);

    let handle1 = thread::spawn(move || {
        let mut num1 = num1_arc1.lock().unwrap();
        println!("[1] Got num1");
        thread::sleep(Duration::from_millis(2000));
        let mut num2 = num2_arc1.lock().unwrap();
        println!("[1] Got num2");
        *num1 += 1;
        *num2 += 1;
        println!("[1] {}, {}", num1, num2);
    });

    let handle2 = thread::spawn(move || {
        let mut num2 = num2_arc2.lock().unwrap();
        println!("[2] Got num2");
        thread::sleep(Duration::from_millis(2000));
        let mut num1 = num1_arc2.lock().unwrap();
        println!("[2] Got num1");
        *num1 += 1;
        *num2 += 1;
        println!("[2] {}, {}", num1, num2);
    });

    handle1.join().unwrap();
    handle2.join().unwrap();

    println!("After: {:?}, {:?}", num1, num2);
}

Sync & Send

When we use Rc instead of Arc in the arc_basic example above, we will get a compile error instead of runtime error. Below is the error we will get

   Compiling ch16_fearless_concurrency v0.1.0 (/home/eiger/Tutorials/rust-tutorial/chapters/ch16_fearless_concurrency)
error[E0277]: `Rc<i32>` cannot be sent between threads safely
   --> src/mutex_demo.rs:41:41
    |
41  |               let handler = thread::spawn(move || {
    |                             ------------- ^------
    |                             |             |
    |  ___________________________|_____________within this `{closure@src/mutex_demo.rs:41:41: 41:48}`
    | |                           |
    | |                           required by a bound introduced by this call
42  | |                 let number = *number;
43  | |                 println!("[{}] The number is: {}", i, number);
44  | |             });
    | |_____________^ `Rc<i32>` cannot be sent between threads safely
    |
    = help: within `{closure@src/mutex_demo.rs:41:41: 41:48}`, the trait `Send` is not implemented for `Rc<i32>`, which is required by `{closure@src/mutex_demo.rs:41:41: 41:48}: Send`
note: required because it's used within this closure
   --> src/mutex_demo.rs:41:41
    |
41  |             let handler = thread::spawn(move || {
    |                                         ^^^^^^^
note: required by a bound in `spawn`
   --> /home/eiger/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:691:8
    |
688 | pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    |        ----- required by a bound in this function
...
691 |     F: Send + 'static,
    |        ^^^^ required by this bound in `spawn`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `ch16_fearless_concurrency` (bin "ch16_fearless_concurrency" test) due to 1 previous error

This is made possible by the marker trait std::marker::Send and std::marker::Sync. These two traits don’t have actual implementation. They are used only to tell compiler if the type satisfies certain condition.

Send

Send allows transference of ownership between threads. Almost all Rust types are Send, and all types consisting of Send types are automatically Send. Exceptions includes Rc<T>. Rc<T> is not send because the implementation of Rc<T> doesn’t guarantee safe concurrent update of ref count. Were we allowed to transfer ownership of Rc<T> between threads, we could hold an Rc<T> in thread 1, clone it and move it to thread 2, in which case both threads can update the ref count concurrently.

Rc<T> is not Send because that allows an efficient implementation of the rec count mechanism. When Send is needed, we should use std::Sync::Arc.

By not marking Rc<T> as Send, there is no way for us to accidentally use Rc<T> in a concurrent setting at runtime because such error is reported at compile time.

Sync

Sync allows access from multiple threads. This means any type T is Sync if &T is Send (“access a value from a thread must be done via a reference to the value”). Rc<T> is not Sync for the same reason above. Moreover, RefCell<T> is not Sync because the internal borrow checker of RefCell<T> is not thread-safe.

Note that RefCell<T> is Send whenever T is Send. In the example below, we move a RefCell<i32> inside a thread. This is allowed because i32 is Send, which makes RefCell<i32> also Send.

#[test]
fn send_refcell() {
    let val = RefCell::new(40);
    println!("val: {:?}", val);

    let mut val_ref = val.borrow_mut();
    *val_ref += 1;
    println!("val: {:?}", *val_ref);
    // need to drop the mutable ref to val before moving it into thread.
    drop(val_ref);

    let handle = thread::spawn(move || {
        // RefCell<i32> is moved from one thread to another.
        let mut val_ref_thread = val.borrow_mut();
        *val_ref_thread += 1;
        println!("val: {:?}", *val_ref_thread);
    });

    handle.join().unwrap();
}

Mutex<T> is Sync.

Asynchronous Programming