Consider adding support for condition-based logic in workflows, such as a step.waitFor(conditionQuery) or similar API, which would allow a workflow to pause execution until a specified query or condition is met, or a timeout occurs. This would simplify workflow code and make it easier to build event-driven and reactive backend processes.
#Condition-Based Waiting in Convex Workflows
26 messages · Page 1 of 1 (latest)
Does this capture the request? https://github.com/get-convex/workflow/issues/19
Also related (but less likely to be the next feature): https://github.com/get-convex/workflow/issues/25
@median marsh has issue#19 been implemented or is that still in the works? This would be nice to have
No it's on my short list but doesn't exist yet. I've started it in a branch, could share an early alpha to folks in that issue
Hi @median marsh
I found the branch you mentioned in the workflow repository, but I can't find the resume method in the workflow manager client.
Can you please let us know the current status of this feature?
Thank you so much for implementing this feature as it's going to help us remove hundreds of lines of code that needed to be written for different chains of workflow.define() to model a complicated state machine.
@median marsh I forked the workflow repo and added the resume method in workflow mnager and also did some refactoring.
Just sharing the PR for your review and get your feedback since I really need this feature in production. It seems to work fine but feel free to highlight anything that can be improved to make it production-ready.
Thank you so much 🙏
Thanks! I meant to implement something last night. I'd love to see some examples where you're using it, as I decided over the course of the week that something structured like a promise might be more ergonomic. But I'd love to see some workflow examples in the wild to gauge which is easier to use
- Create a promise somewhere
const promiseId = await workflow.createPromise(ctx, workflowId); // step can be used in place of ctx
- Wait for the promise in the workflow
const result = await step.awaitPromise(promiseId, { validator });
- Asynchronously resolve / reject the promise
await workflow.resolvePromise(ctx, promiseId, value)
// or
await workflow.rejectPromise(ctx, promiseId, error)
Here is an example of how i'm using workflow with multiple pause and resume points:
https://gist.github.com/a-fatah/26cad260680602e0453909f0f82481f8
This is super helpful, thanks! One question: what do you think about doing something like
const promiseId = await step.runAction(internal.subscriptionTrial.sendUpgradePrompt, {
userId: args.userId,
email: args.email,
});
const userDecision = await step.awaitPromise(promiseId, {
validator: v.union(
v.object({
decision: v.literal("upgrade"),
paymentMethodId: v.string(),
}),
v.object({ decision: v.literal("decline") }),
)
});
instead? The main thing being that you create promises in your own mutation / actions, vs. implicitly in the onPauseHandler - then you can await them, even if they're resolved ahead of time
this makes sense to me and i see that this will solve the ahead-of-time resolution issue.
i am thinking of how we can decouple the actions/mutations to mint the promise id or signal that we can await on.
since we have a workflow.createPromise() api so we should be able to create promise id in the workflow and inject it into the actions/mutations to include it the outbound emails etc.
the is a great mental model for implementation but i'm not sure it's the best from the workflow authoring point of view.
However, can we consider the terms signal or checkpoint instead of promise as they better describe an external event that unblocks the workflow. This could also affect the naming internally like having a signals table instead of promises which sounds like a low-level javascript construct while signal sounds like a high-level abstraction.
i also want to share some ideas that i thought about regarding the api design that makes sure that promise/signal handles exist before the workflow blocks:
const upgradeSignal = await workflow.signals.create(step.workflowId, {
name: "waitUserDecision",
returns: decisionValidator,
});
await step.runAction(internal.subscriptionTrial.sendUpgradePrompt, {
userId: args.userId,
email: args.email,
token: upgradeSignal,
});
const decision = await step.awaitSignal(upgradeSignal);
this is basically the same idea that you shared but just renames promise to signal and injects it as token instead of mutations/actions creating and returning them.
following is just an idea, not 100% sure if it's possible to implement this in a type-safe way:
const sendPrompt = workflow.signals.bind(
internal.subscriptionTrial.sendUpgradePrompt,
); // ensures that signalId can be injected to the action/mutation
const upgrade = await workflow.signals.create(step, {
name: "waitUserDecision",
returns: decisionValidator,
/**
runs immediately or scheduled after the signal is created
and receives injected signalId
*/
onCreated: sendPrompt({
userId: args.userId,
email: args.email,
}),
});
const decision = await workflow.signals.await(upgrade, { timeout: 7 * DAY })
Hi @median marsh
Just sharing an update on an api design idea that i worked on just for fun
I managed to implement following design that returns signals api along with mutation that can be used to resolve them in a type-safe way.
const workflow = new WorkflowManager(components.workflow);
export const { mutation: workflowMutation, signals } = workflow.define({
args: {
orderId: v.string(),
amount: v.number(),
},
signals: {
paymentReceived: v.object({
paymentId: v.string(),
amount: v.number(),
status: v.string(),
}),
orderShipped: v.object({
trackingNumber: v.string(),
carrier: v.string(),
}),
},
async handler(ctx, args) {
console.log(`Starting payment workflow for order ${args.orderId}, amount: $${args.amount}`);
const paymentSignal = await ctx.signals.create("paymentReceived");
const shippingSignal = await ctx.signals.create("orderShipped");
const payment = await ctx.signals.awaitSignal(paymentSignal);
if (payment.status !== "completed") {
return {
success: false,
reason: "Payment failed",
orderId: args.orderId,
};
}
console.log(`Waiting for shipping confirmation...`);
const shipping = await ctx.signals.awaitSignal(shippingSignal);
console.log(`Order shipped! Tracking: ${shipping.trackingNumber} via ${shipping.carrier}`);
return {
success: true,
orderId: args.orderId,
paymentId: payment.paymentId,
trackingNumber: shipping.trackingNumber,
};
},
returns: v.object({
success: v.boolean(),
orderId: v.string(),
paymentId: v.optional(v.string()),
trackingNumber: v.optional(v.string()),
reason: v.optional(v.string()),
}),
});
type-safe resolution:
await signals?.paymentReceived.resolve(
ctx,
signalId,
{
paymentId,
amount: amount || 0,
status: status || "completed",
}
);
let me what you think about this?
Wow, really great stuff @vernal dragon . Oddly enough, I was using the term "signals" before switching to "promise" b/c I thought folks would be more intimidated by signal. I like decoupling the structural definition (e.g. the name & validator) to share between the consumer & resolver.
As for your "onCreated" bind idea - I think that could more naturally look like
const sendPrompt = workflow.signals.bind(
internal.subscriptionTrial.sendUpgradePrompt,
); // ensures that signalId can be injected to the action/mutation
const upgrade = await step.runMutation(internal.subscriptionTrial.sendUpgradePrompt, {
userId: args.userId,
email: args.email,
});
with the sendUpgradePrompt creating and returning the signal. The only bummer is not having the validator type on the signal in the same scope.
I think something like this could be a good low-tech way to "share the signal shape":
const paymentSignal = {
name: "paymentReceived",
validator: v.object({
paymentId: v.string(),
amount: v.number(),
status: v.string(),
}),
timeout: 7 * DAYS,
};
...
await workflow.createSignal(ctx, paymentSignal);
...
await step.awaitSignal(ctx, signalToken, paymentSignal);
...
await workflow.sendSignal(ctx, signalToken, { ...paymentSignal, value });
One thing I was thinking about with signals was to not provide a value. I thought of Promises as a more natural container for holding a value (or being able to reject it). With a strictly binary gate for a signal, you'd have to query the state yourself, but you could do something like:
const result = await step.runMutation(internal.foo.handlePayment, {...args}, { waitFor: [signalToken] });
which is similar to:
const paymentStatus = await step.awaitSignal(step, signalToken, paymentSignal);
const result = await step.runMutation(internal.foo.handlePayment, {paymentStatus }});
but with fewer steps.
The annoying piece is if you are waiting on the signal to decide what to do next, e.g.
const upgrade = await step.awaitSignal(step, signalToken, upgradeSignal);
if (upgrade) {
...
} else {
...
}
Then you're having to break up your logic, or do something like:
const upgrade = await step.runQuery(internal.foo.getUpgradeStatus, { args }, { waitFor: upgradeSignal });
if (upgrade) ...
Thanks @Ian . You are right, promise will be a more appropriate term for a container holding a value and conceptually I also aimed for similar idea but didn't realize that signal can sound like a "binary gate" which may also sound like a change of mental model.
Regarding the idea of passing an object to create, await and send methods, i think it can be a "low-tech" solution to avoid breaking changes to the workflow.define() method but i'm also thinking of the possibility of passing a wrong promised data shape with a resume token like:
await workflow.sendSignal(
ctx,
signalId, // created using a different validator
{
...paymentSignal, // contains payment validator
value
}
)
Could this lead to mismatch between the data shape expected by the container and the actual resolved data?
what do you think of "events"? i think they are also part of vocabulary when talking about workflows or state machines and things like "paymentReceived" and "orderShipped" also qualify as events
Events to me implies that you'd send multiple events / have some handler that runs on each event. @vernal dragon what do you think about signals but keeping it binary / not having it store any data?
Binary gates can be a special case of signals with no value which just block the execution of workflow and I think this can play an important in current workflow where each mutation/action runs in sequence.
there can be applications like human-in-the-loop/approvals or synchronization possible to implement with a binary gate like:
const { toolCalls, messages } = await ctx.runAction(internal.agent.sendMessage, ...)
if (toolCalls) {
await ctx.signals.gate("approval");
await ctx.runAction(internal.agent.callTools, { toolCalls });
}
or waiting for multiple approvals:
const teamMembersApprovals = team.map(p => ctx.signals.gate(`${p.name}Approval`));
const managersApproval = ctx.signals.gate("managerApproval");
// Send prompts
await Promise.all([
...teamMembersApprovals,
managerApproval
]);
I think It can be a very useful primitive for workflows.
In particular passing the signals to the steps vs. awaiting them explicitly before a step:
const upgrade = await step.runQuery(
internal.foo.getUpgradeStatus,
{ ...args },
{ waitFor: [upgradeSignal] }
);
As a way of side-stepping the unvalidated data passing. This way you can fetch all the data you rely on within a query or mutation transaction, vs. getting some data via the signal/promise first, then passing that to a query/mutation/action which may need to do other related lookups anyways
this can be a possible style make execution steps wait for some signal but it should also be possible to write workflows step-by-step as a simple alternative
After some more iteration, I went with events after all:
- You can await an event by name and send an event by name, in the simplest case
- You can also create an event ahead of time and pass around the ID.
- An event is only ever consumed by one step.
- It can be sent first or awaited first, and it'll have timestamps to know how long things took.
- You can send a full "result" payload - which will become the step's result - allowing canceling / failing via event.
I like the defineEvent approach for declaring events and their schemas. I will create a separate conversation to brainstorm about implementing more orchestration patterns in workflow context. for now, i think we have a condition-based waiting feature in workflow.
Thanks @median marsh