Why does Arc<Mutex<T>> potentially cause a memory leak?
I have some problematic code that is leaking memory
everything below is running in a async task so there are multiple spawns occuring.
type TasksStore = Arc<Mutex<HashMap<Arc<StreamRequest>, HashSet<Recipient<ExportData>>>>>;
/// Async task to be spawned by actor context.
/// Will subscribe and read strings from the ZMQ socket, based off of the requested and send to all recipeients in the
/// TasksStore corresponding to the channel requested
pub async fn process_read_task(
channel: Arc<StreamRequest>,
socket: Arc<zmq::Socket>,
tasks_store: TasksStore,
dispatch_host: String,
) {
if !create_topic(&channel, &dispatch_host).await.is_ok() {
log::error!("Failed to create topic: {:?}", &channel.symbol);
return;
}
log::info!("Spawned task for: {}", &channel);
let topic = &channel.to_topic();
let sub_stat = socket.set_subscribe(topic.as_bytes()).is_ok();
log::info!("Socket Subscribe status: {}", sub_stat);
loop {
let recipients = {
let tasks_store_lock = tasks_store.lock().unwrap();
tasks_store_lock.get(&channel).cloned()
};
if let Some(recipients) = &recipients {
for recipient in recipients {
match socket.recv_string(zmq::DONTWAIT) {
Ok(message) => match_send(match_request(message.unwrap()), recipient, &channel),
Err(zmq::Error::EAGAIN) => {
tokio::time::sleep(tokio::time::Duration::from_nanos(1)).await
}
Err(e) => log::error!("Error while recieving message {:?}", e),
}
tokio::time::sleep(tokio::time::Duration::from_nanos(1)).await;
}
if recipients.is_empty() {
break;
}
} else {
break;
}
}
}