#Patterns for back-end at scale, cluster?

1 messages · Page 1 of 1 (latest)

craggy umbra
#

How would one go about using xstate with a framework like nest.js, taking into account that we need to scale horizontally in cluster type of setting. Any ideas or examples?

craggy umbra
#

I'm trying to use AI and examples from the repo, I'm beginner

#

tryiing to implement xstate into nest.js with good patterns, using bullmq

#

and how does that seem, I need to have parallel processing and stuff

#

also since this is a rather long running process uhm, what about potential failurese, e.g unexpected server restarts and stuff like that

#

I should also persist the running machines in let's say postgres?

#

hmm it's very hard to understand how this is supposed to help in back-end

#

I kind of understand a bit, but also there's like some key understanding missing

#

perhaps someone can shed some light.. now thining about this rationally

#

private activeAnalyses: Map<string, ActorRef<any>> = new Map();

I shouldn't have this, as if we want to scale in clusters then it won't work

#

need to figure out a good way to think about it

#

each new analysis flow = its own state machine

#

so if user submits a new website that triggers this process, should create new machine, start it and persist it in db, add a new task in bullmq que, and return to user success message, "processes queued"?

#

or should I just add a new task to bullmq which then processes and creates new machine there, does it matter? actually yeah probably need to create machine in original request so I can add the submitted input url to context already? this way it is fully persisted

#

then task processor will fetch this machine from db as it starts processing and almost everything can run in that single processor there

#

and it will simply keep updating the db every time anything changes with the machine state? what is the process for that.. how do I persist it properly, how to convert to and from to save in db

#

how to detect when to update persisted machine in db

#

this way if task doesn't complete, server restarts or task fails for other reasons, next time it will load from persisted db the machine that has made progress and start from where it left off?

#

is this thinking in the right direction or not

long shadow
#

Hey, sorry for the delay. It's an interesting question, but the solution shouldn't be any different than a general scaling solution

craggy umbra
craggy umbra
#

but to persist state in db, is there any example?

#

do you just persist the entire machine

#

getPersistedSnapshot();

#

and then later:
const actor = createActor(machine, { snapshot }).start();

#

right?

#

what's the difference between createActor and createMachine?

#

nvm they are completeley different

#

you need createMachine that creates the machine to pass to createActor, right

#

okay so actor is a running machine

#

nice

#

😄

#

I'm really new to this, even tho I have been trying to understand it's still very hard

#

there seems to be multiple ways to achieve the same thing and that can be a bit overwhelming

#

but I think in the end it will make more sense as I work through a few examples of using machines in my app

#

this very helpful

#

only thing I don't fully get about this is that is is storing the running actor in memory?

#

if we have multiple servers

#

and queue

#

it will be a bit different

#

hmm I guess im thinking about this

#

this starts the machine then persists it, and returns the workflowId in response.. but what happens to the actual running actor?

#

it stops on its own and gets garbage collected at this point?

#

Actors can persist their internal state and restore it later. Persistence refers to storing the state of an actor in persistent storage, such as localStorage or a database. Restoration refers to restoring the state of an actor from persistent storage.

#

I guess what I'm trying to understand is

#

uhm

#

let's say we have a long running process that is orchestrated by stately machine.. how can we setup our API to also receive events from outside and pass these to the actively running machine or if there's not one running we need to spawn one? to me it seems a bit confusing in cluster type environment.. so I guess there needs to be something like eventsQueue

#

like this wouldn't really work , it has to be stateless somehow:

// website-analysis.service.ts
import { Injectable } from '@nestjs/common';
import { createActor } from 'xstate';
import { websiteAnalysisMachine } from './website-analysis.machine';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { Website } from './entities/website.entity';

@Injectable()
export class WebsiteAnalysisService {
  private persistedStates: Record<string, unknown> = {};

  constructor(
    @InjectRepository(Website)
    private websiteRepository: Repository<Website>
  ) {}

  async startAnalysis(url: string): Promise<string> {
    const website = await this.websiteRepository.save({ url });
    const workflowId = website.id;

    const actor = createActor(websiteAnalysisMachine, {
      input: { websiteId: workflowId, url }
    }).start();

    this.persistedStates[workflowId] = actor.getSnapshot();

    return workflowId;
  }

  async sendEvent(workflowId: string, event: any): Promise<void> {
    const snapshot = this.persistedStates[workflowId];

    if (!snapshot) {
      throw new Error('Workflow not found');
    }

    const actor = createActor(websiteAnalysisMachine, { snapshot }).start();

    actor.send(event);

    this.persistedStates[workflowId] = actor.getSnapshot();

    actor.stop();
  }

  getState(workflowId: string): unknown {
    const persistedState = this.persistedStates[workflowId];

    if (!persistedState) {
      throw new Error('Workflow not found');
    }

    return persistedState;
  }
}
#

confusin about this part, if we have it in DB:

  async sendEvent(workflowId: string, event: any): Promise<void> {
    const snapshot = this.persistedStates[workflowId];

    if (!snapshot) {
      throw new Error('Workflow not found');
    }

    const actor = createActor(websiteAnalysisMachine, { snapshot }).start();

    actor.send(event);

    this.persistedStates[workflowId] = actor.getSnapshot();

    actor.stop();
  }

we could retrieve it from DB but how do we know there not already another instance of the actor running in another process, so at this point I guess we need a queue

#

basically some way to lock is required

#

in addition to queues actually

#
#
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { createActor } from 'xstate';
import { websiteAnalysisMachine } from './website-analysis.machine';
import { Website } from './entities/website.entity';
import { MachineState } from './entities/machine-state.entity';
import { Redlock } from 'redlock';

@Injectable()
export class WebsiteAnalysisService {
  private redlock: Redlock;

  constructor(
    @InjectRepository(Website)
    private websiteRepository: Repository<Website>,
    @InjectRepository(MachineState)
    private machineStateRepository: Repository<MachineState>,
    @InjectQueue('websiteAnalysis') private websiteAnalysisQueue: Queue
  ) {
    this.redlock = new Redlock([redisClient], {
      // Redlock options
    });
  }

  async startAnalysis(url: string): Promise<string> {
    const website = await this.websiteRepository.save({ url });
    const workflowId = website.id;

    const actor = createActor(websiteAnalysisMachine, {
      input: { websiteId: workflowId, url }
    });

    const initialState = actor.getPersistedSnapshot();

    await this.machineStateRepository.save({
      workflowId,
      state: initialState
    });

    await this.websiteAnalysisQueue.add('processEvent', {
      workflowId,
      event: { type: 'START' }
    }, { jobId: workflowId });

    return workflowId;
  }

  async sendEvent(workflowId: string, event: any): Promise<void> {
    await this.websiteAnalysisQueue.add('processEvent', {
      workflowId,
      event
    }, { jobId: workflowId });
  }

  async getState(workflowId: string): Promise<unknown> {
    const machineState = await this.machineStateRepository.findOne({ where: { workflowId } });
    if (!machineState) {
      throw new Error('Workflow not found');
    }
    return machineState.state;
  }
}

@Processor('websiteAnalysis')
export class WebsiteAnalysisProcessor {
  constructor(
    @InjectRepository(MachineState)
    private machineStateRepository: Repository<MachineState>,
    private websiteAnalysisService: WebsiteAnalysisService
  ) {}

  @Process('processEvent')
  async processEvent(job: Job<{ workflowId: string; event: any }>) {
    const { workflowId, event } = job.data;

    const lock = await this.websiteAnalysisService.redlock.acquire([`workflow-lock:${workflowId}`], 5000);

    try {
      const machineState = await this.machineStateRepository.findOne({ where: { workflowId } });
      if (!machineState) {
        throw new Error('Workflow not found');
      }

      const actor = createActor(websiteAnalysisMachine, {
        snapshot: machineState.state
      }).start();

      actor.send(event);

      const newState = actor.getPersistedSnapshot();
      await this.machineStateRepository.update(machineState.id, { state: newState });

      if (actor.getSnapshot().status === 'done') {
        // Implement cleanup logic here
      }

      actor.stop();
    } finally {
      await lock.release();
    }
  }
}
#

still it's not perfect I think

#

what happens here:

      actor.send(event);

      const newState = actor.getPersistedSnapshot();

need a way to know that processing completed and machine is kind of like "idle", to persist state

#

hmm hmm it is such a complex thing to reason about

#

there's so many factors

#

async processes also that are important to resolve

#

let me try to make a sample machine

#

i guess long running async processes add the complexity

#

there needs to be one process where the actor is running and somehow new events in case it is running must reach it

#

hmm

#

use case would be to be able to cancel for example, in the middle of long running actor execution

#

or if talking about AI agents, to change long running course of actions

#

stumbling upon this now that seems somewhat related:

#

actually lock does nothing in this case xD

#

we need bidirectional communication, like socket

#

if there's already a long running actor we must be able to send it new events no matter which process it is part of, from another process... this can only be done through like websocket on top of redis

#

all other solutions gonna be hacky

#

something like that

craggy umbra
#

damn im too confused now 😄

#

often side effects are not to be forgotten about but part of the functioning of the app, we can't proceed to next step without some of them etc

#

and in case these side effects are already running, what happens if new request comes in at the same time

#

like if we have an action that will cause transition to finished state after it has finished, how can we just forget about it

#

things don't happen instantly hm, persisting needs to take this into account, also new events coming have to somehow reach this running actor

craggy umbra
#

Redis Pub/Sub

#

this might work

#
// website-analysis.processor.ts
@Processor('websiteAnalysis')
export class WebsiteAnalysisProcessor {
  private subscriber: Redis;
  private actors: Map<string, ActorRef<typeof websiteAnalysisMachine>> = new Map();

  constructor(
    @InjectRedis() private redis: Redis,
    @InjectQueue('websiteAnalysis') private websiteAnalysisQueue: Queue
  ) {
    this.subscriber = redis.duplicate();
    this.setupSubscriber();
  }

  private setupSubscriber() {
    this.subscriber.subscribe('analysisEvents');
    this.subscriber.on('message', (channel, message) => {
      if (channel === 'analysisEvents') {
        const { jobId, event } = JSON.parse(message);
        this.handleEvent(jobId, event);
      }
    });
  }

  private handleEvent(jobId: string, event: any) {
    const actor = this.actors.get(jobId);
    if (actor) {
      actor.send(event);
    }
  }

  @Process('analyze')
  async analyze(job: Job<{ url: string }>) {
    const { url } = job.data;
    
    const actor = createActor(websiteAnalysisMachine, {
      input: { jobId: job.id, url }
    });

    this.actors.set(job.id, actor);

    actor.subscribe(state => {
      // Update job progress
      job.progress(state.context.progress);

      // Emit state updates
      this.redis.publish('stateUpdates', JSON.stringify({
        jobId: job.id,
        state: state
      }));

      if (state.matches('cancelled')) {
        job.moveToFailed({ message: 'Job cancelled' });
        this.actors.delete(job.id);
      } else if (state.matches('completed')) {
        job.moveToCompleted(state.context.result);
        this.actors.delete(job.id);
      }
    });

    actor.start();
    actor.send({ type: 'START' });

    // Wait for the machine to reach a final state
    await new Promise<void>(resolve => {
      const unsubscribe = actor.subscribe(state => {
        if (state.status === 'done') {
          unsubscribe();
          resolve();
        }
      });
    });
  }
}

// website-analysis.service.ts
@Injectable()
export class WebsiteAnalysisService {
  private publisher: Redis;

  constructor(
    @InjectRedis() private readonly redis: Redis,
    @InjectQueue('websiteAnalysis') private websiteAnalysisQueue: Queue
  ) {
    this.publisher = redis.duplicate();
  }

  async startAnalysis(url: string) {
    const job = await this.websiteAnalysisQueue.add('analyze', { url });
    return job.id;
  }

  async sendEvent(jobId: string, event: any) {
    await this.publisher.publish('analysisEvents', JSON.stringify({ jobId, event }));
  }
}
#

I have to test but in theory could work

#

redis ftw

#

only issue with this that I'm thinking of right now is that what if the processor is no longer there but a new event comes in

#

i have to rethink this to support also "pause" analysis

#

which will kill the processor but in case we restart, restarts from where we left off

craggy umbra
#

if I want to reference services from nest.js, this is the proper way?

import { ActorRef, interpret } from 'xstate';
import { websiteAnalysisMachine } from './website-analysis.machine';
import { MyService } from '../my-service.service';

export class WebsiteAnalysisProcessor extends WorkerHost {
  constructor(private readonly myService: MyService) {
    super();
    // ... other initializations ...
  }

  async process(job: Job<{ url: string }>) {
    const { url } = job.data;

    const machineWithServices = websiteAnalysisMachine.withConfig({
      services: {
        crawlWebsite: (context) => this.myService.crawlWebsite(context.url),
        analyzeContent: (context) => this.myService.analyzeContent(context.data),
        analyzeBacklinks: (context) => this.myService.analyzeBacklinks(context.url),
        generateSEOReport: async (context) => this.myService.generateSEOReport(context),
      },
      actions: {
        logStart: () => this.myService.log('Starting website analysis'),
        logCompletion: () => this.myService.log('Website analysis completed'),
        logError: ({ context }) => this.myService.logError(context.error),
      },
    });

    const actor = interpret(machineWithServices)
      .onTransition((state) => {
        // Handle state changes
      })
      .start();

    // ... rest of your processing logic ...
  }
}
#

I think interpret is old syntax ye

#

now we have createActor() instead

#

oh and withConfig also is v4

#

hmm having trouble finding documentation about it

craggy umbra
#

alright I've made some progress

#
# ---
# Filename: ../uni-back-end/src/website/analysis/website-analysis.controller.ts
# ---

import { Controller, Post, Get, Param, Body } from '@nestjs/common';
import { WebsiteAnalysisService } from './website-analysis.service';
import { Auth } from 'src/iam/authentication/decorators/auth.decorator';
import { AuthType } from 'src/iam/authentication/enums/auth-type.enum';

@Auth(AuthType.None)
@Controller('website')
export class WebsiteAnalysisController {
  constructor(
    private readonly websiteAnalysisService: WebsiteAnalysisService,
  ) {}

  /**
   * Endpoint to start website analysis.
   * @param url The URL of the website to analyze.
   * @returns An object containing the job ID.
   */
  @Post('analyze')
  async analyzeWebsite(@Body('url') url: string): Promise<{ jobId: string }> {
    const jobId = await this.websiteAnalysisService.analyzeWebsite(url);
    return { jobId };
  }

  /**
   * Endpoint to get the status of a website analysis job.
   * @param jobId The ID of the job.
   * @returns The job's status, progress, and result (if completed).
   */
  @Get('status/:jobId')
  async getJobStatus(@Param('jobId') jobId: string): Promise<any> {
    const status = await this.websiteAnalysisService.getJobStatus(jobId);
    return status;
  }
}
#
# ---
# Filename: ../uni-back-end/src/website/analysis/website-analysis.service.ts
# ---

import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Injectable()
export class WebsiteAnalysisService {
  constructor(
    @InjectQueue('websiteAnalysis') private websiteAnalysisQueue: Queue,
  ) {}

  /**
   * Initiates a website analysis by adding a job to the queue.
   * @param url The URL of the website to analyze.
   * @returns The job ID for tracking.
   */
  async analyzeWebsite(url: string): Promise<string> {
    const job = await this.websiteAnalysisQueue.add('analyze', { url });
    return job.id;
  }

  /**
   * Retrieves the status of a website analysis job.
   * @param jobId The ID of the job.
   * @returns The job's status, progress, and result (if completed).
   */
  async getJobStatus(jobId: string): Promise<any> {
    const job = await this.websiteAnalysisQueue.getJob(jobId);

    if (!job) {
      throw new Error('Job not found');
    }

    const state = await job.getState();
    const progress = job.progress;
    const result = job.returnvalue;

    return {
      id: job.id,
      state,
      progress,
      result,
    };
  }

  /**
   * Crawls the website and provides progress updates.
   * @param url The URL to crawl.
   * @param onProgress Callback for progress updates.
   * @returns A promise that resolves with the crawl data.
   */
  async crawlWebsite(
    url: string,
    onProgress: (progress: number) => void,
  ): Promise<any> {
    // Implement the actual crawling logic here
    // Use onProgress(progress) to send progress updates

    // Example implementation:
    return new Promise((resolve, reject) => {
      let progress = 0;
      const interval = setInterval(() => {
        progress += 10;
        onProgress(progress);
        if (progress >= 100) {
          clearInterval(interval);
          resolve({
            /* crawl results */
          });
        }
      }, 1000);
    });
  }

  /**
   * Analyzes content of the website.
   * @param url The URL to analyze.
   * @returns A promise that resolves with the content analysis data.
   */
  async analyzeContent(url: string): Promise<any> {
    // Implement the content analysis logic here

    // Example implementation:
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve({
          /* content analysis results */
        });
      }, 3000);
    });
  }

  /**
   * Analyzes backlinks of the website.
   * @param url The URL to analyze.
   * @returns A promise that resolves with the backlink data.
   */
  async analyzeBacklinks(url: string): Promise<any> {
    // Implement the backlink analysis logic here

    // Example implementation:
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve({
          /* backlink data */
        });
      }, 2000);
    });
  }

  /**
   * Generates an SEO report based on content and backlink data.
   * @param contentAnalysis Content analysis data.
   * @param backlinkData Backlink data.
   * @returns A promise that resolves with the SEO report.
   */
  async generateSEOReport(
    contentAnalysis: any,
    backlinkData: any,
  ): Promise<{ seoScore: number }> {
    // Implement the SEO report generation logic here

    // Example implementation:
    return new Promise((resolve) => {
      setTimeout(() => {
        const seoScore = Math.random() * 100;
        resolve({ seoScore });
      }, 1000);
    });
  }
}

#

^ this is a working example of using nest.js + bullmq + xstate v5

#

I didn't yet add any persistence to it tho, for now it will create a new machine for every requst.. obviously that's now how it should work in real world

#

working on that tomorrow to see if I can add also persitence

#

and communication between processes (so you can send a stop event even in cluster, with redis pub sub)

#

is this kind of typing the way to go when providing actors from above:

  actors: {
    // Placeholder actors; implementations will be provided later
    crawlWebsite: undefined as ActorLogic<any, any, any> | undefined, // To be provided
    analyzeContent: undefined as ActorLogic<any, any, any> | undefined, // To be provided
    analyzeBacklinks: undefined as ActorLogic<any, any, any> | undefined, // To be provided
    generateSEOReport: undefined as ActorLogic<any, any, any> | undefined, // To be provided
  },
#

?

#

didn't find much documentation about scenarios like this

craggy umbra
#

almost perfect now 😛 added AbortController also to enable cancelling long running processes, if anybody wants to see my nest.js implementations i can show, just tag me here

long shadow
#

I need a summary of this thread 😅