#SplitSink trait bounds not met

28 messages · Page 1 of 1 (latest)

cunning drift
#

I passed an atomic reference to the SplitSink from open_connection() -> handle_payload() -> send_heartbeat(). I want to use the sink to send messages but when I try to use the send(&self) method on the sink it gives me this error:

```the method send exists for reference &SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>, but its trait bounds were not satisfied
the following trait bounds were not satisfied:
&SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>: futures::Sink<_>
which is required by &SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>: SinkExt<_>
SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>: futures::Sink<_>
which is required by SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>: SinkExt<_>


Here is the source:

```rs
  async fn open_connection(&self) {
        //**
        let (sink, stream) = ws_stream.split();

        let sink_sh = Arc::new(sink); // <-- allows me to call send()
        
        stream.try_for_each(|m| async {
            //**
            self.handle_payload(payload, &sink_sh).await //<-- passed off
        }).await?;
    }
async fn handle_payload(&self, payload: Payload, sink: &Arc<Sink<S>>) {
  //**
  let sh_sink = sink.clone(); //cannot call send()
  tokio::spawn(async move {
    loop {
      interval.tick().await;
      send_heartbeat(sh_sink.as_ref()).await; //<-- passed
    }
  });

I'm assuming it is due to the vague generics causing object slicing, but I'm not sure what to do

ember salmon
#

By "object slicing" do you mean the C++ concept? Because that's not a thing in Rust: we don't have classes and inheritance.

#

So, send seems to be futures::sink::SinkExt::send. For that method to exist, you need the type to implement SinkExt.
There is a blanket impl of SinkExt<Item> for T: Sink<Item> + ?Sized, so every Sink is also a SinkExt with the same item type
Now, a SplitSink<S, Item> is a Sink<Item>, but that's not what you have

#

you have a &SplitSink<S, Item>

#

For this to work, there would need to be an impl<S, Item> Sink<Item> for &S where S: Sink<Item> (read: an impl saying that &S is a Sink<Item> if S is)

#

There is no such impl

#

Unsurprisingly, since the Sink methods all take &mut self. There is an impl like that for &mut, though

#

However, an Arc doesn't let you get a &mut to its contents

#

You'd need something like a Mutex too.

#

Alternatively, you could try to pass handle_payload a &mut Sink<S> directly (no idea if that would work with try_for_each, but it may). It would not work with the tokio::spawn, though.

cunning drift
#

Not a fan of creating massive functions but trying to split responsibility is starting to become a bigger hassle than it's worth

ember salmon
#

Honestly if you unsplit these two functions you'd have the same issue

cunning drift
#

So do I do something like this?

        let sink_sh = Arc::new(Mutex::new(sink));
        
        stream.try_for_each(|m| async {
self.handle_payload(payload, &sink_sh).await
        }).await?;
    async fn handle_payload<S>(&self, payload: Payload, sink: &Arc<Mutex<Sink<S>>>) -> Result<()> {
  
}

How would I get a mutable instance?

ember salmon
#

Lock the mutex: sink.lock(). This returns a MutexGuard, which you can mostly use as if it was a mutable reference (it coerces to one). If it fails to coerce, use &mut *guard to force it

#

The mutex is unlocked when you drop the guard

cunning drift
#

In handle_payload() I pass the mutex to another function called send_heartbeat():

                Hello => {
                    let interval = payload.d["heartbeat_interval"].as_i64().unwrap() as u64;
                    let mut interval = time::interval(Duration::from_millis(interval));
                    
                    let sh_sink = Arc::clone(sink);

                    tokio::spawn(async move {
                        loop {
                            interval.tick().await;
                            send_heartbeat(sh_sink).await;
                        }
                    });
async fn send_heartbeat<S>(sink: Arc<Mutex<Sink<S>>>) {
    let mut sink = sink.lock().unwrap();
}

Is this correct?

ember salmon
#

I'm assuming send_heartbeat does some actual work: that snippet doesn't touch the sink at all. But that aside, it's correct

cunning drift
#

Problem is, I still cannot access sink.send()

ember salmon
#

What's the error now?

#

@cunning drift

cunning drift
#

mismatched types
expected reference &SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, _>, tokio_tungstenite::tungstenite::Message>
found reference &std::sync::Mutex<SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>>

ember salmon
#

sink.lock().unwrap().send()

#

It appears your sink is still a mutex, in that error

cunning drift
#

no method named send found for struct std::sync::MutexGuard<'_, SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>> in the current scope
method not found in std::sync::MutexGuard<'_, SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>>

ember salmon
#

It doesn't deref? Ah, right too generic

#

Uh, pick between (&mut *sink.lock().unwrap()).send() and

use std::ops::DerefMut;
sink.lock().unwrap().deref_mut().send()
cunning drift
#
async fn send_heartbeat<S>(sink: Arc<Mutex<Sink<S>>>) {
    sink.lock().unwrap().deref_mut().send();
    println!("Sending heartbeat.");
}

```no method named send found for mutable reference &mut SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message> in the current scope
method not found in &mut SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>


If I bring SinkExt into scope then it gives me

the method send exists for mutable reference &mut SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>, but its trait bounds were not satisfied
the following trait bounds were not satisfied:
SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>: futures::Sink<_>
which is required by SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>: SinkExt<_>
&mut SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>: futures::Sink<_>
which is required by &mut SplitSink<futures::stream::MapErr<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, S>, tokio_tungstenite::tungstenite::Message>: SinkExt<_>

ember salmon
#

Hmm. I don't know why, then