#Code Review Request

19 messages · Page 1 of 1 (latest)

visual swift
#

Hi, i was writing some async server-client connection to learn more rust. However for a long time i was unable to decide which crates should i use. I decided to use tokio as its the most popular, also i heard about flume which is a more efficient way of handling channels. My idea was to make 100% safe code, for some reason panics are not showing up when they appear in a different tokio task so i needed to handle it with Results, i decided to use anyhow because i dont need more complex error handling. So in order for it to work i needed to await every single task to redirect the error which means i needed to introduce graceful closing for everything. After some time I successfully done that. However i think that my pattern isnt the best and is very very uncovienient. I really need someone who is very familiar with rust ecosystem to review this code and propose any changes.

So i think that all i need to put here is the server's code as client was less complex and was using a similar logic.

#

First off, main function:

fn main() -> Result<()> {
    let (sender, receiver) = flume::unbounded();
    let conn_manager = Arc::new(ConnManager::new(sender));
    let cloned = conn_manager.clone();
    let conn_task: JoinHandle<Result<()>> = runtime.spawn(async move { Ok(cloned.run().await?) });
    let cloned = conn_manager.clone();
    let capture_conn_task: JoinHandle<Result<()>> =
        runtime.spawn(async move { Ok(cloned.run_capture_conn_task().await?) });

    conn_task.await??;
    capture_conn_task.await??;

    Ok(())
}
#

so what i was doing, conn_manager struct in this example is not holding its own tasks in fields because i cant create its tasks before it exists, i could have used some Options but i decided not to, i tend to avoid Options but i feel like im not happy with the code that i have here

#

Now how is the ConnManager implemented:

use std::{collections::HashMap, sync::Arc};

use anyhow::Result;
use flume::{Receiver, Sender};
use tokio::{
    sync::{Mutex, Notify},
    task::JoinHandle,
};
use uuid::Uuid;

use crate::{
    conn::connection::ConnMessage,
    gui::{main_gui::MainGuiMessage},
};

use super::{
    tcp::{self, Tcp},
    udp::Udp,
};

pub struct TcpConnection {
    pub inner: tcp::TcpConnection,
    receive_from_client_task: JoinHandle<Result<()>>,
}

impl TcpConnection {
    fn stop(&self) {
        self.inner.stop();
    }

    async fn await_tasks(self) -> Result<()> {
        self.receive_from_client_task.await??;
        self.inner.await_tasks().await?;
        Ok(())
    }
}

pub struct ConnManager {
    tcp: Arc<Tcp>,
    tcp_receiver: Receiver<tcp::TcpConnection>,
    udp: Udp,
    notify_shutdown: Notify,
    pub conns: Arc<Mutex<HashMap<Uuid, TcpConnection>>>,
    gui_sender: Sender<MainGuiMessage>,
}

impl ConnManager {
    pub fn new(gui_sender: Sender<MainGuiMessage>) -> Self {
        let (tcp_sender, tcp_receiver) = flume::unbounded();
        Self {
            tcp: Arc::new(Tcp::new(tcp_sender, gui_sender.clone())),
            tcp_receiver,
            udp: Udp::new(),
            notify_shutdown: Notify::new(),
            conns: Arc::new(Mutex::new(HashMap::new())),
            gui_sender,
        }
    }

    // runs all the tasks that are inactive inside it
    pub async fn run(&self) -> Result<()> {
        let tcp = self.tcp.clone();
        let tcp_task: JoinHandle<Result<()>> =
            tokio::spawn(async move { Ok(tcp.run_receive_task().await?) });

        let udp_task = self.udp.run().await?;

        tcp_task.await??;
        let (udp_res,) = tokio::join!(udp_task);
        udp_res??;

        Ok(())
    }

    // capture connections incoming from tcp
    pub async fn run_capture_conn_task(&self) -> Result<()> {
        println!("starting capture conns```
#

task");
        loop {
            tokio::select! {
                Ok(conn) = self.tcp_receiver.recv_async() => {
                    let uuid = Uuid::new_v4();

                    let addr = conn.inner.addr.clone();
                    println!("Added active client: {}, uuid: {}", addr, uuid);
                    let conns = self.conns.clone();
                    let rx = conn.rx.clone();
                    let inner_conn = conn.inner.clone();
                    let gui_sender = self.gui_sender.clone();
                    let receive_from_client_task = tokio::spawn(async move {
                        println!("starting client message receive task");
                        loop {
                            tokio::select! {
                                Ok(msg) = rx.recv_async() => {
                                    if Self::handle_conn_msg(msg, &addr, uuid, conns.clone(), gui_sender.clone()).await? {
                                        break;
                                    };
                                }
                                _ = inner_conn.notify_shutdown.notified() => {
                                    break;
                                },
                            }
                        }
                        println!("stoped client message receive task");
                        return Ok(());
                    });
                    let conn = TcpConnection{inner: conn, receive_from_client_task};
                    self.conns.lock().await.insert(uuid, conn);
                    self.gui_sender.send_async(MainGuiMessage::AddClient(uuid)).await?;
                }
                _ = self.notify_shutdown.notified() => {
                    println!("ended capture conns task");
                    for (_, conn) in self.conns.lock().await.drain() {
                        conn.await_tasks().await?;
                    }
                    println!("closed all```
#

connections");
                    return Ok(());
                },
            }
        }
    }

    async fn handle_conn_msg(
        msg: ConnMessage,
        addr: &str,
        uuid: Uuid,
        conns: Arc<Mutex<HashMap<Uuid, TcpConnection>>>,
        gui_sender: Sender<MainGuiMessage>,
    ) -> Result<bool> {
        match msg {
            // here we only take responsibility of removing the client from the list and awaiting for some tasks which isnt too important i guess
            ConnMessage::CloseConn => {
                println!("client send close, removing it");
                if let Some(conn) = conns.lock().await.remove(&uuid) {
                    conn.stop();
                    // WARNING: We throw out the receive_from_client_task result as it is not important when client decided to end connection, the task will close anyway cuz of the bool return
                    conn.inner.await_tasks().await?;
                    println!("Removed active client: {}, uuid: {}", addr, uuid);
                    gui_sender
                        .send_async(MainGuiMessage::RemoveClient(uuid))
                        .await?;
                    return Ok(true);
                } else {
                    println!("CLIENT WASNT FOUND!!!")
                }
            }
        }
        Ok(false)
    }

    pub async fn stop(&self) {
        println!("stopping connection manager");
        self.notify_shutdown.notify_waiters();
        self.tcp.stop();
        self.udp.stop();
        for (_, conn) in self.conns.lock().await.iter() {
            conn.stop();
        }
    }
}
boreal crown
#

from a quick look i'd say that you fell into the trap of trying to create an oop like shared data design

#

for example, you throw all connections into an Arc<Mutex<T>> instead of letting every task own its connection

#

this is not only more ergonomic but also faster since having what's essentially a global lock negates all benefits of async tasks

boreal crown
visual swift
#

should i then fully rely on receivers and senders?

visual swift
#

but yeah i could get rid of additional struggle by removing this Mutex and instead making it channel based

#

but idk if thats what you meant, maybe there is a better way

#

@boreal crown idk if i should ping you but i really want to know

#

i can wait

boreal crown
# visual swift should i then fully rely on receivers and senders?

yea, senders and receivers are the way to go there imo
if you really need to often modify their state from the outside, you could also use mutices but only if you make a single mutex hold only one TcpConnection at a time (and if the additional latency from blocking on it is fine with you)