#Too many Arc<Mutex<T>>'s?

5 messages · Page 1 of 1 (latest)

serene fable
#

Im creating a local library to my project that connects to a websocket server and generalizes some of the events.

The basic idea is connect, send message, read stream into buffer and the send data out via tokio::sync::watch. I'm finding that I'm either using channels or mutexs for handling the shared state. Is there a strategy that is less reliant on Arc<Mutex<T>> specifically?

Here's the growing mess

#[derive(Debug)]
pub struct Client {
    connected: bool,
    write: Option<SplitSink<SocketStream, Message>>,
    /// By default this HeapAllocated Ring buffer will have a capacity of `1024`
    /// This buffer will be used for data only
    data_buffer: Arc<Mutex<VecDeque<(String, String)>>>,
    /// This buffer will only be for non-data messages Eg: Hb, status, infor & warn messages
    message_buffer: Arc<Mutex<VecDeque<String>>>,
    /// Hold onto rx channels for consumers
    subscriptions: BTreeMap<String, watch::Receiver<String>>,
    /// Hold onto tx channels 
    senders: Arc<Mutex<BTreeMap<String, watch::Sender<String>>>>,
}
untold forge
#

What if you held both sides of a channel?

#

You might also want to put all the Arc fields into a struct, and then wrap the entire struct in Arc.

struct Shared {
  data_buffer: Mutex<VecDeque<(String, String)>>,
  message_buffer: Mutex<VecDeque<String>>,
  ...
}
pub struct Client {
  connected: bool,
  write: Option<SplitSink<SocketStream, Message>>,
  shared: Arc<Shared>
}
serene fable
#

hmm

#

I'm actually experimenting with a different approach tell me what you think.

#[cfg(test)]
mod test {
    type SocketStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
    struct Demo {
        pub write: SplitSink<SocketStream, Message>,
        pub queue: VecDeque<String>,
    }

    impl Demo {
        pub async fn subscribe();
        pub async fn unsubscribe();
    }
    #[derive(Debug)]
    enum Cmd {
        Sub,
        Unsub,
    }
    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn example() {
        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

        tx.send(Cmd::Sub).unwrap();

        tokio::spawn(async move {
            let (write_stream, _) =
                connect_async(Url::parse("wss://example.com/ws").expect("url"))
                    .await
                    .unwrap();

            let (w, mut r) = write_stream.split();
            let mut stan = Demo {
                write: w,
                queue: VecDeque::new(),
            };

            loop {
                tokio::select! {
                    Some(Ok(val)) = r.next() => {
                        // dbg!(&val);
                        let _ = &mut stan.queue.push_back(val.to_string());
                    }
                    Some(cmd) = rx.recv() => {
                        match cmd {
                           Cmd::Sub => stan.subscribe().await,
                           Cmd::Unsub => stan.unsubscribe().await,
                        }
                    }
                }
            }
        });
        time::sleep(Duration::from_secs(2)).await;
        tx.send(Cmd::Unsub).unwrap();
    }
}