#Arc<Mutex<T>> Memory Leak

23 messages · Page 1 of 1 (latest)

lofty sinew
#

Why does Arc<Mutex<T>> potentially cause a memory leak?
I have some problematic code that is leaking memory

everything below is running in a async task so there are multiple spawns occuring.

type TasksStore = Arc<Mutex<HashMap<Arc<StreamRequest>, HashSet<Recipient<ExportData>>>>>;

/// Async task to be spawned by actor context.
/// Will subscribe and read strings from the ZMQ socket, based off of the requested and send to all recipeients in the
/// TasksStore corresponding to the channel requested
pub async fn process_read_task(
    channel: Arc<StreamRequest>,
    socket: Arc<zmq::Socket>,
    tasks_store: TasksStore,
    dispatch_host: String,
) {
    if !create_topic(&channel, &dispatch_host).await.is_ok() {
        log::error!("Failed to create topic: {:?}", &channel.symbol);
        return;
    }

    log::info!("Spawned task for: {}", &channel);
    let topic = &channel.to_topic();
    let sub_stat = socket.set_subscribe(topic.as_bytes()).is_ok();

    log::info!("Socket Subscribe status: {}", sub_stat);

    loop {
        let recipients = {
            let tasks_store_lock = tasks_store.lock().unwrap();
            tasks_store_lock.get(&channel).cloned()
        };

        if let Some(recipients) = &recipients {
            for recipient in recipients {
                match socket.recv_string(zmq::DONTWAIT) {
                    Ok(message) => match_send(match_request(message.unwrap()), recipient, &channel),
                    Err(zmq::Error::EAGAIN) => {
                        tokio::time::sleep(tokio::time::Duration::from_nanos(1)).await
                    }
                    Err(e) => log::error!("Error while recieving message {:?}", e),
                }
                tokio::time::sleep(tokio::time::Duration::from_nanos(1)).await;
            }
            if recipients.is_empty() {
                break;
            }
        } else {
            break;
        }
    }
}
hot galleon
#

It doesn't look like this code is allocating anything so it's not the issue. Whenever you have a collection of heap allocations, there's a possibility of fragmentation.

lofty sinew
#

Well there is this mess of logic

let socket = self.zmq_socket.clone();

                let mut tasks = self.tasks.lock().unwrap();
                match tasks.entry(channel.clone()) {
                    Entry::Occupied(mut found) => {
                        found.get_mut().insert(recipient);
                    }
                    Entry::Vacant(unique) => {
                        let mut new_set = HashSet::new();
                        new_set.insert(recipient);
                        unique.insert(new_set);
                        let read_task = process_read_task(
                            channel.clone(),
                            socket.clone(),
                            self.tasks.clone(),
                            self.dispatch_details.1.clone(),
                            self.cache.clone(),
                        );
                        self.handles
                            .insert(channel.clone(),         ctx.spawn(read_task.into_actor(self)));

#

Which gets called on pretty regularly

hot galleon
#

There's always a possibility it's just using a bunch of memory and not technically leaking.

lofty sinew
#

I know for a fact that its leaking because we watched the memory climb from 40Mb to 500Mb

hot galleon
#

A leak would happen if you have cyclic arc refs or you're calling specifically leaky functions, like Box::leak

#

You can use 500MB without being leaky. Discord does it lol.

lofty sinew
#

Yeah but thats JS and chromium for you.

#

This code (a server in actuality) was aiming to be very memory efficient

#

kinda missed the mark on that one huh...

hot galleon
craggy seal
#

Because of the spikes from the gc?

lofty sinew
#

desktop app not server

jagged bobcat
#

"it's not leaking unless you can show the leak in valgrind" - a rule in the book of unofficial rules

#

fragmentation is real, it shows up in RSS all the time

lofty sinew
#

I was wondering what mem profiler to use

jagged bobcat
#

allocate less, or pool allocations

lofty sinew
#

Ideally do both

jagged bobcat
#

jemalloc has a ton of stats it can spit out if you switch to using that, or use something like KDE/heaptrack to actually profile the allocations taking place

#

heaptrack is good for understanding what the makeup of your heap is, and the jemalloc stats are more about "ok, but why are you still holding on to so much memory which should have been given back to the OS already?"

lofty sinew
#

so long as it tells me what function is making allocations and if its getting dropped or not