#Patterns for back-end at scale, cluster?
1 messages · Page 1 of 1 (latest)
I'm trying to use AI and examples from the repo, I'm beginner
tryiing to implement xstate into nest.js with good patterns, using bullmq
is this using wrong syntax from v4?
and how does that seem, I need to have parallel processing and stuff
also since this is a rather long running process uhm, what about potential failurese, e.g unexpected server restarts and stuff like that
I should also persist the running machines in let's say postgres?
hmm it's very hard to understand how this is supposed to help in back-end
I kind of understand a bit, but also there's like some key understanding missing
perhaps someone can shed some light.. now thining about this rationally
private activeAnalyses: Map<string, ActorRef<any>> = new Map();
I shouldn't have this, as if we want to scale in clusters then it won't work
need to figure out a good way to think about it
each new analysis flow = its own state machine
so if user submits a new website that triggers this process, should create new machine, start it and persist it in db, add a new task in bullmq que, and return to user success message, "processes queued"?
or should I just add a new task to bullmq which then processes and creates new machine there, does it matter? actually yeah probably need to create machine in original request so I can add the submitted input url to context already? this way it is fully persisted
then task processor will fetch this machine from db as it starts processing and almost everything can run in that single processor there
and it will simply keep updating the db every time anything changes with the machine state? what is the process for that.. how do I persist it properly, how to convert to and from to save in db
how to detect when to update persisted machine in db
this way if task doesn't complete, server restarts or task fails for other reasons, next time it will load from persisted db the machine that has made progress and start from where it left off?
is this thinking in the right direction or not
Hey, sorry for the delay. It's an interesting question, but the solution shouldn't be any different than a general scaling solution
yeah true true, ques probably are the way to go
but to persist state in db, is there any example?
do you just persist the entire machine
getPersistedSnapshot();
and then later:
const actor = createActor(machine, { snapshot }).start();
right?
what's the difference between createActor and createMachine?
nvm they are completeley different
you need createMachine that creates the machine to pass to createActor, right
okay so actor is a running machine
nice
😄
I'm really new to this, even tho I have been trying to understand it's still very hard
there seems to be multiple ways to achieve the same thing and that can be a bit overwhelming
but I think in the end it will make more sense as I work through a few examples of using machines in my app
this very helpful
only thing I don't fully get about this is that is is storing the running actor in memory?
if we have multiple servers
and queue
it will be a bit different
hmm I guess im thinking about this
this starts the machine then persists it, and returns the workflowId in response.. but what happens to the actual running actor?
it stops on its own and gets garbage collected at this point?
reading this now https://stately.ai/docs/persistence
Actors can persist their internal state and restore it later. Persistence refers to storing the state of an actor in persistent storage, such as localStorage or a database. Restoration refers to restoring the state of an actor from persistent storage.
I guess what I'm trying to understand is
uhm
let's say we have a long running process that is orchestrated by stately machine.. how can we setup our API to also receive events from outside and pass these to the actively running machine or if there's not one running we need to spawn one? to me it seems a bit confusing in cluster type environment.. so I guess there needs to be something like eventsQueue
like this wouldn't really work , it has to be stateless somehow:
// website-analysis.service.ts
import { Injectable } from '@nestjs/common';
import { createActor } from 'xstate';
import { websiteAnalysisMachine } from './website-analysis.machine';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { Website } from './entities/website.entity';
@Injectable()
export class WebsiteAnalysisService {
private persistedStates: Record<string, unknown> = {};
constructor(
@InjectRepository(Website)
private websiteRepository: Repository<Website>
) {}
async startAnalysis(url: string): Promise<string> {
const website = await this.websiteRepository.save({ url });
const workflowId = website.id;
const actor = createActor(websiteAnalysisMachine, {
input: { websiteId: workflowId, url }
}).start();
this.persistedStates[workflowId] = actor.getSnapshot();
return workflowId;
}
async sendEvent(workflowId: string, event: any): Promise<void> {
const snapshot = this.persistedStates[workflowId];
if (!snapshot) {
throw new Error('Workflow not found');
}
const actor = createActor(websiteAnalysisMachine, { snapshot }).start();
actor.send(event);
this.persistedStates[workflowId] = actor.getSnapshot();
actor.stop();
}
getState(workflowId: string): unknown {
const persistedState = this.persistedStates[workflowId];
if (!persistedState) {
throw new Error('Workflow not found');
}
return persistedState;
}
}
confusin about this part, if we have it in DB:
async sendEvent(workflowId: string, event: any): Promise<void> {
const snapshot = this.persistedStates[workflowId];
if (!snapshot) {
throw new Error('Workflow not found');
}
const actor = createActor(websiteAnalysisMachine, { snapshot }).start();
actor.send(event);
this.persistedStates[workflowId] = actor.getSnapshot();
actor.stop();
}
we could retrieve it from DB but how do we know there not already another instance of the actor running in another process, so at this point I guess we need a queue
basically some way to lock is required
in addition to queues actually
maybe this https://www.npmjs.com/package/redlock ?
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { createActor } from 'xstate';
import { websiteAnalysisMachine } from './website-analysis.machine';
import { Website } from './entities/website.entity';
import { MachineState } from './entities/machine-state.entity';
import { Redlock } from 'redlock';
@Injectable()
export class WebsiteAnalysisService {
private redlock: Redlock;
constructor(
@InjectRepository(Website)
private websiteRepository: Repository<Website>,
@InjectRepository(MachineState)
private machineStateRepository: Repository<MachineState>,
@InjectQueue('websiteAnalysis') private websiteAnalysisQueue: Queue
) {
this.redlock = new Redlock([redisClient], {
// Redlock options
});
}
async startAnalysis(url: string): Promise<string> {
const website = await this.websiteRepository.save({ url });
const workflowId = website.id;
const actor = createActor(websiteAnalysisMachine, {
input: { websiteId: workflowId, url }
});
const initialState = actor.getPersistedSnapshot();
await this.machineStateRepository.save({
workflowId,
state: initialState
});
await this.websiteAnalysisQueue.add('processEvent', {
workflowId,
event: { type: 'START' }
}, { jobId: workflowId });
return workflowId;
}
async sendEvent(workflowId: string, event: any): Promise<void> {
await this.websiteAnalysisQueue.add('processEvent', {
workflowId,
event
}, { jobId: workflowId });
}
async getState(workflowId: string): Promise<unknown> {
const machineState = await this.machineStateRepository.findOne({ where: { workflowId } });
if (!machineState) {
throw new Error('Workflow not found');
}
return machineState.state;
}
}
@Processor('websiteAnalysis')
export class WebsiteAnalysisProcessor {
constructor(
@InjectRepository(MachineState)
private machineStateRepository: Repository<MachineState>,
private websiteAnalysisService: WebsiteAnalysisService
) {}
@Process('processEvent')
async processEvent(job: Job<{ workflowId: string; event: any }>) {
const { workflowId, event } = job.data;
const lock = await this.websiteAnalysisService.redlock.acquire([`workflow-lock:${workflowId}`], 5000);
try {
const machineState = await this.machineStateRepository.findOne({ where: { workflowId } });
if (!machineState) {
throw new Error('Workflow not found');
}
const actor = createActor(websiteAnalysisMachine, {
snapshot: machineState.state
}).start();
actor.send(event);
const newState = actor.getPersistedSnapshot();
await this.machineStateRepository.update(machineState.id, { state: newState });
if (actor.getSnapshot().status === 'done') {
// Implement cleanup logic here
}
actor.stop();
} finally {
await lock.release();
}
}
}
still it's not perfect I think
what happens here:
actor.send(event);
const newState = actor.getPersistedSnapshot();
need a way to know that processing completed and machine is kind of like "idle", to persist state
hmm hmm it is such a complex thing to reason about
there's so many factors
async processes also that are important to resolve
let me try to make a sample machine
i guess long running async processes add the complexity
there needs to be one process where the actor is running and somehow new events in case it is running must reach it
hmm
use case would be to be able to cancel for example, in the middle of long running actor execution
or if talking about AI agents, to change long running course of actions
stumbling upon this now that seems somewhat related:
actually lock does nothing in this case xD
we need bidirectional communication, like socket
if there's already a long running actor we must be able to send it new events no matter which process it is part of, from another process... this can only be done through like websocket on top of redis
all other solutions gonna be hacky
Adapter to enable broadcasting of events to multiple separate socket.io server nodes. - socketio/socket.io-redis-adapter
something like that
damn im too confused now 😄
often side effects are not to be forgotten about but part of the functioning of the app, we can't proceed to next step without some of them etc
and in case these side effects are already running, what happens if new request comes in at the same time
like if we have an action that will cause transition to finished state after it has finished, how can we just forget about it
things don't happen instantly hm, persisting needs to take this into account, also new events coming have to somehow reach this running actor
Redis Pub/Sub
this might work
// website-analysis.processor.ts
@Processor('websiteAnalysis')
export class WebsiteAnalysisProcessor {
private subscriber: Redis;
private actors: Map<string, ActorRef<typeof websiteAnalysisMachine>> = new Map();
constructor(
@InjectRedis() private redis: Redis,
@InjectQueue('websiteAnalysis') private websiteAnalysisQueue: Queue
) {
this.subscriber = redis.duplicate();
this.setupSubscriber();
}
private setupSubscriber() {
this.subscriber.subscribe('analysisEvents');
this.subscriber.on('message', (channel, message) => {
if (channel === 'analysisEvents') {
const { jobId, event } = JSON.parse(message);
this.handleEvent(jobId, event);
}
});
}
private handleEvent(jobId: string, event: any) {
const actor = this.actors.get(jobId);
if (actor) {
actor.send(event);
}
}
@Process('analyze')
async analyze(job: Job<{ url: string }>) {
const { url } = job.data;
const actor = createActor(websiteAnalysisMachine, {
input: { jobId: job.id, url }
});
this.actors.set(job.id, actor);
actor.subscribe(state => {
// Update job progress
job.progress(state.context.progress);
// Emit state updates
this.redis.publish('stateUpdates', JSON.stringify({
jobId: job.id,
state: state
}));
if (state.matches('cancelled')) {
job.moveToFailed({ message: 'Job cancelled' });
this.actors.delete(job.id);
} else if (state.matches('completed')) {
job.moveToCompleted(state.context.result);
this.actors.delete(job.id);
}
});
actor.start();
actor.send({ type: 'START' });
// Wait for the machine to reach a final state
await new Promise<void>(resolve => {
const unsubscribe = actor.subscribe(state => {
if (state.status === 'done') {
unsubscribe();
resolve();
}
});
});
}
}
// website-analysis.service.ts
@Injectable()
export class WebsiteAnalysisService {
private publisher: Redis;
constructor(
@InjectRedis() private readonly redis: Redis,
@InjectQueue('websiteAnalysis') private websiteAnalysisQueue: Queue
) {
this.publisher = redis.duplicate();
}
async startAnalysis(url: string) {
const job = await this.websiteAnalysisQueue.add('analyze', { url });
return job.id;
}
async sendEvent(jobId: string, event: any) {
await this.publisher.publish('analysisEvents', JSON.stringify({ jobId, event }));
}
}
I have to test but in theory could work
redis ftw
only issue with this that I'm thinking of right now is that what if the processor is no longer there but a new event comes in
i have to rethink this to support also "pause" analysis
which will kill the processor but in case we restart, restarts from where we left off
if I want to reference services from nest.js, this is the proper way?
import { ActorRef, interpret } from 'xstate';
import { websiteAnalysisMachine } from './website-analysis.machine';
import { MyService } from '../my-service.service';
export class WebsiteAnalysisProcessor extends WorkerHost {
constructor(private readonly myService: MyService) {
super();
// ... other initializations ...
}
async process(job: Job<{ url: string }>) {
const { url } = job.data;
const machineWithServices = websiteAnalysisMachine.withConfig({
services: {
crawlWebsite: (context) => this.myService.crawlWebsite(context.url),
analyzeContent: (context) => this.myService.analyzeContent(context.data),
analyzeBacklinks: (context) => this.myService.analyzeBacklinks(context.url),
generateSEOReport: async (context) => this.myService.generateSEOReport(context),
},
actions: {
logStart: () => this.myService.log('Starting website analysis'),
logCompletion: () => this.myService.log('Website analysis completed'),
logError: ({ context }) => this.myService.logError(context.error),
},
});
const actor = interpret(machineWithServices)
.onTransition((state) => {
// Handle state changes
})
.start();
// ... rest of your processing logic ...
}
}
I think interpret is old syntax ye
now we have createActor() instead
oh and withConfig also is v4
hmm having trouble finding documentation about it
alright I've made some progress
# ---
# Filename: ../uni-back-end/src/website/analysis/website-analysis.controller.ts
# ---
import { Controller, Post, Get, Param, Body } from '@nestjs/common';
import { WebsiteAnalysisService } from './website-analysis.service';
import { Auth } from 'src/iam/authentication/decorators/auth.decorator';
import { AuthType } from 'src/iam/authentication/enums/auth-type.enum';
@Auth(AuthType.None)
@Controller('website')
export class WebsiteAnalysisController {
constructor(
private readonly websiteAnalysisService: WebsiteAnalysisService,
) {}
/**
* Endpoint to start website analysis.
* @param url The URL of the website to analyze.
* @returns An object containing the job ID.
*/
@Post('analyze')
async analyzeWebsite(@Body('url') url: string): Promise<{ jobId: string }> {
const jobId = await this.websiteAnalysisService.analyzeWebsite(url);
return { jobId };
}
/**
* Endpoint to get the status of a website analysis job.
* @param jobId The ID of the job.
* @returns The job's status, progress, and result (if completed).
*/
@Get('status/:jobId')
async getJobStatus(@Param('jobId') jobId: string): Promise<any> {
const status = await this.websiteAnalysisService.getJobStatus(jobId);
return status;
}
}
# ---
# Filename: ../uni-back-end/src/website/analysis/website-analysis.service.ts
# ---
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
@Injectable()
export class WebsiteAnalysisService {
constructor(
@InjectQueue('websiteAnalysis') private websiteAnalysisQueue: Queue,
) {}
/**
* Initiates a website analysis by adding a job to the queue.
* @param url The URL of the website to analyze.
* @returns The job ID for tracking.
*/
async analyzeWebsite(url: string): Promise<string> {
const job = await this.websiteAnalysisQueue.add('analyze', { url });
return job.id;
}
/**
* Retrieves the status of a website analysis job.
* @param jobId The ID of the job.
* @returns The job's status, progress, and result (if completed).
*/
async getJobStatus(jobId: string): Promise<any> {
const job = await this.websiteAnalysisQueue.getJob(jobId);
if (!job) {
throw new Error('Job not found');
}
const state = await job.getState();
const progress = job.progress;
const result = job.returnvalue;
return {
id: job.id,
state,
progress,
result,
};
}
/**
* Crawls the website and provides progress updates.
* @param url The URL to crawl.
* @param onProgress Callback for progress updates.
* @returns A promise that resolves with the crawl data.
*/
async crawlWebsite(
url: string,
onProgress: (progress: number) => void,
): Promise<any> {
// Implement the actual crawling logic here
// Use onProgress(progress) to send progress updates
// Example implementation:
return new Promise((resolve, reject) => {
let progress = 0;
const interval = setInterval(() => {
progress += 10;
onProgress(progress);
if (progress >= 100) {
clearInterval(interval);
resolve({
/* crawl results */
});
}
}, 1000);
});
}
/**
* Analyzes content of the website.
* @param url The URL to analyze.
* @returns A promise that resolves with the content analysis data.
*/
async analyzeContent(url: string): Promise<any> {
// Implement the content analysis logic here
// Example implementation:
return new Promise((resolve) => {
setTimeout(() => {
resolve({
/* content analysis results */
});
}, 3000);
});
}
/**
* Analyzes backlinks of the website.
* @param url The URL to analyze.
* @returns A promise that resolves with the backlink data.
*/
async analyzeBacklinks(url: string): Promise<any> {
// Implement the backlink analysis logic here
// Example implementation:
return new Promise((resolve) => {
setTimeout(() => {
resolve({
/* backlink data */
});
}, 2000);
});
}
/**
* Generates an SEO report based on content and backlink data.
* @param contentAnalysis Content analysis data.
* @param backlinkData Backlink data.
* @returns A promise that resolves with the SEO report.
*/
async generateSEOReport(
contentAnalysis: any,
backlinkData: any,
): Promise<{ seoScore: number }> {
// Implement the SEO report generation logic here
// Example implementation:
return new Promise((resolve) => {
setTimeout(() => {
const seoScore = Math.random() * 100;
resolve({ seoScore });
}, 1000);
});
}
}
^ this is a working example of using nest.js + bullmq + xstate v5
I didn't yet add any persistence to it tho, for now it will create a new machine for every requst.. obviously that's now how it should work in real world
working on that tomorrow to see if I can add also persitence
and communication between processes (so you can send a stop event even in cluster, with redis pub sub)
is this kind of typing the way to go when providing actors from above:
actors: {
// Placeholder actors; implementations will be provided later
crawlWebsite: undefined as ActorLogic<any, any, any> | undefined, // To be provided
analyzeContent: undefined as ActorLogic<any, any, any> | undefined, // To be provided
analyzeBacklinks: undefined as ActorLogic<any, any, any> | undefined, // To be provided
generateSEOReport: undefined as ActorLogic<any, any, any> | undefined, // To be provided
},
?
didn't find much documentation about scenarios like this
almost perfect now 😛 added AbortController also to enable cancelling long running processes, if anybody wants to see my nest.js implementations i can show, just tag me here
I need a summary of this thread 😅