#implement concurrency queue
4 messages · Page 1 of 1 (latest)
Hi @tulip warren. I'm kapa.ai, your friendly support bot. I will try to answer your question by searching through the knowledge base. :wave:
Based on the knowledge sources, here's how to implement a concurrency-controlled queue using Workpool in Convex:
Basic Setup
With Workpool already installed, you can control concurrency by setting the maxParallelism option when creating your workpool instance:
const pool = new Workpool(components.yourWorkpool, {
maxParallelism: 5, // Process 5 items concurrently
});
Processing Multiple Lines of Data
To process multiple items with controlled concurrency, use the enqueueActionBatch method. According to the Workpool documentation:
await pool.enqueueActionBatch(ctx, internal.weather.scrape, [
{ city: "New York" },
{ city: "Los Angeles" },
{ city: "Chicago" },
]);
Complete Example
Here's a practical example for processing data with controlled concurrency:
const dataPool = new Workpool(components.dataWorkpool, {
maxParallelism: 10, // Control how many items process at once
});
export const processDataBatch = mutation({
args: { items: v.array(v.any()) },
handler: async (ctx, args) => {
// Enqueue all items at once, but they'll be processed
// with the specified parallelism limit
await dataPool.enqueueActionBatch(
ctx,
internal.processing.processItem,
args.items.map(item => ({ data: item }))
);
},
});
Key Benefits
As noted in the Workpool documentation, batching helps by:
- Reducing the number of calls to the component, which reduces overhead
- Reducing the number of mutations that might conflict with each other
Parallelism Limits
According to the workflow documentation, you should:
- Not exceed 20 on the free tier
- Not exceed 100 on Pro accounts (across all workpools and workflows)