#StackBounty: #multithreading #asynchronous #rust #rust-tokio Multithreaded Client that sends data in a queue and stores data in anothe…

Bounty: 50

I’m having difficulties in making a Tokio client that receives packets from a server and stores them in a queue for the main thread to process, while being able to send packets to the server from another queue at the same time.

I’m trying to make a very simple online game demonstration, having a game client that Sends data (it’s own modified states, like player movement) and receives data (Game states modified by other players & server, like an NPC/other players that also moved).

The idea is to have a network thread that accesses two Arcs holding Mutexes to Vec<bytes::Bytes> that store serialized data. One Arc is for IncomingPackets, and the other for OutgoingPackets. IncomingPackets would be filled by packets sent from the server to the client that would be later read by the main thread, and OutgoingPackets would be filled by the main thread with packets that should be sent to the server.

I can’t seem to receive or send packets in another thread.

The client would only connect to the server, and the server would allow many clients (which would be served individually).

The explanations around stream’s usage and implementation are not newbie-friendly, but I think I should be using them somehow.

I wrote some code, but it does not work and is probably wrong.

(My original code does not compile, so treat this as pseudocode, sorry)

Playground

extern crate byteorder; // 1.3.4
extern crate futures; // 0.3.5
extern crate tokio; // 0.2.21 

use bytes::Bytes;
use futures::future;
use std::error::Error;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;

use byteorder::{BigEndian, WriteBytesExt};
use std::io;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::{ReadHalf, WriteHalf};

//This is the SharedPackets struct that is located in the crate structures
struct SharedPackets {
    data: Mutex<Vec<bytes::Bytes>>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    let (mut r, mut w) = stream.split();

    let mut inc: Vec<bytes::Bytes> = Vec::new();
    inc.push(Bytes::from("Wow"));

    let mut incoming_packets = Arc::new(SharedPackets {
        data: Mutex::new(inc),
    });

    let mut outg: Vec<bytes::Bytes> = Vec::new();
    outg.push(Bytes::from("Wow"));
    let mut outgoint_packets = Arc::new(SharedPackets {
        data: Mutex::new(outg),
    });

    let mut local_incoming_packets = Arc::clone(&incoming_packets);
    let mut local_outgoint_packets = Arc::clone(&outgoint_packets);
    let mut rarc = Arc::new(Mutex::new(r));
    let mut warc = Arc::new(Mutex::new(w));

    tokio::spawn(async move {
        //send and receive are both async functions that contain an infinite loop
        //they basically use AsyncWriteExt and AsyncReadExt to manipulate both halves of the stream
        //send reads the queue and write this data on the socket
        //recv reads the socket and write this data on the queue
        //both "queues" are manipulated by the main thread
        let mut read = &*rarc.lock().unwrap();
        let mut write = &*warc.lock().unwrap();

        future::try_join(
            send(&mut write, &mut local_outgoint_packets),
            recv(&mut read, &mut local_incoming_packets),
        )
        .await;
    });

    loop {
        //read & write other stuff on both incoming_packets & outgoint_packets
        //until the end of the program
    }
}

async fn recv(reader: &mut ReadHalf<'_>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
    loop {
        let mut buf: Vec<u8> = vec![0; 4096];

        let n = match reader.read(&mut buf).await {
            Ok(n) if n == 0 => return Ok(()),
            Ok(n) => n,
            Err(e) => {
                eprintln!("failed to read from socket; err = {:?}", e);
                return Err(e);
            }
        };
    }
}

async fn send(writer: &mut WriteHalf<'_>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
    loop {
        //task::sleep(Duration::from_millis(300)).await;
        {
            let a = vec!["AAAA"];
            for i in a.iter() {
                let mut byte_array = vec![];
                let str_bytes = i.as_bytes();
                WriteBytesExt::write_u32::<BigEndian>(&mut byte_array, str_bytes.len() as u32)
                    .unwrap();
                byte_array.extend(str_bytes);

                writer.write(&byte_array).await?;
            }
        }
    }
}

This does not compile:

error: future cannot be sent between threads safely
   --> src/main.rs:46:5
    |
46  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::spawn`
    |
    = help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::tcp::ReadHalf<'_>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:55:9
    |
52  |           let mut read = &*rarc.lock().unwrap();
    |                            -------------------- has type `std::sync::MutexGuard<'_, tokio::net::tcp::ReadHalf<'_>>` which is not `Send`
...
55  | /         future::try_join(
56  | |             send(&mut write, &mut local_outgoint_packets),
57  | |             recv(&mut read, &mut local_incoming_packets),
58  | |         )
59  | |         .await;
    | |______________^ await occurs here, with `rarc.lock().unwrap()` maybe used later
60  |       });
    |       - `rarc.lock().unwrap()` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/main.rs:52:25
    |
52  |         let mut read = &*rarc.lock().unwrap();
    |                         ^^^^^^^^^^^^^^^^^^^^^

error: future cannot be sent between threads safely
   --> src/main.rs:46:5
    |
46  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::spawn`
    |
    = help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::tcp::WriteHalf<'_>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:55:9
    |
53  |           let mut write = &*warc.lock().unwrap();
    |                             -------------------- has type `std::sync::MutexGuard<'_, tokio::net::tcp::WriteHalf<'_>>` which is not `Send`
54  | 
55  | /         future::try_join(
56  | |             send(&mut write, &mut local_outgoint_packets),
57  | |             recv(&mut read, &mut local_incoming_packets),
58  | |         )
59  | |         .await;
    | |______________^ await occurs here, with `warc.lock().unwrap()` maybe used later
60  |       });
    |       - `warc.lock().unwrap()` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/main.rs:53:26
    |
53  |         let mut write = &*warc.lock().unwrap();
    |                          ^^^^^^^^^^^^^^^^^^^^^

I think this is the least of the problems, because I’m really new with tokio.

I could not find an example of this, do you know any performant approach to this problem?


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.