#implement concurrency queue

4 messages · Page 1 of 1 (latest)

tulip warren
#

how to implement? i have workpool installed, how can i process multiple line of data with concurrency controlled values

cerulean jasperBOT
#

Hi @tulip warren. I'm kapa.ai, your friendly support bot. I will try to answer your question by searching through the knowledge base. :wave:

#

Based on the knowledge sources, here's how to implement a concurrency-controlled queue using Workpool in Convex:

Basic Setup

With Workpool already installed, you can control concurrency by setting the maxParallelism option when creating your workpool instance:

const pool = new Workpool(components.yourWorkpool, {
  maxParallelism: 5, // Process 5 items concurrently
});

Processing Multiple Lines of Data

To process multiple items with controlled concurrency, use the enqueueActionBatch method. According to the Workpool documentation:

await pool.enqueueActionBatch(ctx, internal.weather.scrape, [
  { city: "New York" },
  { city: "Los Angeles" },
  { city: "Chicago" },
]);

Complete Example

Here's a practical example for processing data with controlled concurrency:

const dataPool = new Workpool(components.dataWorkpool, {
  maxParallelism: 10, // Control how many items process at once
});

export const processDataBatch = mutation({
  args: { items: v.array(v.any()) },
  handler: async (ctx, args) => {
    // Enqueue all items at once, but they'll be processed
    // with the specified parallelism limit
    await dataPool.enqueueActionBatch(
      ctx, 
      internal.processing.processItem,
      args.items.map(item => ({ data: item }))
    );
  },
});

Key Benefits

As noted in the Workpool documentation, batching helps by:

  1. Reducing the number of calls to the component, which reduces overhead
  2. Reducing the number of mutations that might conflict with each other

Parallelism Limits

#

According to the workflow documentation, you should:

  • Not exceed 20 on the free tier
  • Not exceed 100 on Pro accounts (across all workpools and workflows)

The Workpool will automatically queue items and process them according to your maxParallelism setting, ensuring controlled concurrency.

Note: you can ask me a follow up question by @ mentioning me again :speech_balloon: