#Canonical way to get queues using saq in litestar-fullstack?

1 messages · Page 1 of 1 (latest)

teal imp
#

Sorry if this information is posted elsewhere or can be found in the documentation but previously within litestar-fullstack when using the lib implementations instead of plugins, queues were a standalone variable within the base

from __future__ import annotations

import asyncio
from collections import abc
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

import saq
from redis.asyncio import Redis

from app.lib import serialization, settings

if TYPE_CHECKING:
    from signal import Signals


__all__ = ["Queue", "Worker", "WorkerFunction", "queues", "BackgroundTaskError", "CronJob"]


WorkerFunction = abc.Callable[..., abc.Awaitable[Any]]


@dataclass
class Job(saq.Job):
    """Job Details"""

    job_name: str | None = None
    job_description: str | None = None


@dataclass
class CronJob(saq.CronJob):
    """Cron Job Details"""

    job_name: str | None = None
    job_description: str | None = None


class BackgroundTaskError(Exception):
    """Base class for `Task` related exceptions."""


class Queue(saq.Queue):
    """[SAQ Queue](https://github.com/tobymao/saq/blob/master/saq/queue.py).

    Configures `msgspec` for msgpack serialization/deserialization if not otherwise configured.

    Parameters
    ----------
    *args : Any
        Passed through to `saq.Queue.__init__()`
    **kwargs : Any
        Passed through to `saq.Queue.__init__()`
    """

...


class Worker(saq.Worker):
    """Worker."""

    # same issue: https://github.com/samuelcolvin/arq/issues/182
    SIGNALS: list[Signals] = []

    async def on_app_startup(self) -> None:
        """Attach the worker to the running event loop."""
        loop = asyncio.get_running_loop()
        loop.create_task(self.start())
sly sierraBOT
teal imp
#

Cont from above

redis = Redis.from_url(  # type: ignore[call-overload]
    url=settings.redis.URL,
    decode_responses=False,
    socket_connect_timeout=2,
    socket_keepalive=5,
    health_check_interval=5,
)

queues: dict[str, Queue] = {
    # "background-tasks": Queue(redis, name="background-tasks"),
    # "system-tasks": Queue(redis, name="system-tasks"),
    "other-tasks": Queue(redis, name="other-tasks"),
}
"""
[list[Queue]][app.lib.worker.Queue] instances instantiated with a Redis config
instance.
"""

which could then be exported to another file to get the queues such as

from __future__ import annotations

from typing import TYPE_CHECKING

from .base import Queue, queues

__all__ = ["provide_queues"]


if TYPE_CHECKING:
    from collections.abc import Generator


def provide_queues() -> Generator[dict[str, Queue], None, None]:
    yield queues

Wherein provide_queues could be imported elsewhere for getting the queues. Using saq what is the "correct" way to do the same?

Is it importing the created saq variable from from app.domain.plugins import saq and calling saq.get_queues().queues?

The reason I ask is because I also see

for queue in saq_plugin._config.queue_instances.values(): used elsewhere in order to get the queue instance list, maybe @sacred bough you know off the top of your head which is the preferred option/best? (This issue is required for me to figure out the other one you chimed in on as I'm updating 😅 )

The issue with importing saq and trying to get the queues directly from it is E ImportError: cannot import name 'saq' from partially initialized module 'app.domain.plugins' (most likely due to a circular import) (/Users/.../src/app/domain/plugins.py)from the tasks.py file where I check to see if there are other jobs running with a specific condition (which makes sense since task imports saq). The context here is that I'm trying to inject "queues" as.Provide into the controller

#

Canonical way to get queues using saq in litestar-fullstack?

sacred bough
#

I'm not sure I've setted on the "best" way to do it. Here's what I'm expirimenting with now to inject the queue

#
def get_queues() -> TaskQueues:
    """Get Queues

    Returns:
        dict[str,Queue]: A list of queues
    """
    from app.domain import plugins

    return plugins.saq.get_queues()
teal imp
#

ah hmmm that's still cleaner than what I was trying

sacred bough
#

there's a few more things I'll point out

#

gimmme just a few

#
saq = SAQPlugin(
    config=SAQConfig(
        redis=cache.redis,
        namespace=settings.app.slug,
        web_enabled=True,
        worker_processes=1,
        queue_configs=[
            QueueConfig(
                name="system-tasks",
                tasks=[
                    "app.domain.the_domain.tasks.synchronize_price_stuff",
                    "app.domain.the_domain.tasks.export_price_stuff",
                    "app.domain.the_other_domain.tasks.export_to_somewhere",
                    "app.domain.the_other_domain.tasks.export_to_somehwere_else",
                ],
                scheduled_tasks=[
                    CronJob(
                        function="app.domain.the_other_domain.tasks.export_to_somewhere",,
                        cron="0 * * * *",
                        unique=True,
                        timeout=500,
                    ),
                    CronJob(
                        function="app.domain.the_other_domain.tasks.export_to_somehwere_else",
                        cron="0 * * * *",
                        unique=True,
                        timeout=500,
                    ), 
                    CronJob(
                        function="app.domain.the_domain.tasks.export_price_stuff",
                        cron="0 3 * * *",
                        unique=True,
                        timeout=500,
                    ),
                ],
            ), 
        ],
    ),
)
#

This is a more advanced example

#

You can pass in the function as a stringified name

#

so that you don't have circular imports

#

Here's how you can use the new TaskQueues type:

#
from __future__ import annotations

from time import time
from typing import TYPE_CHECKING

from litestar import Controller

from app.lib.schema import Message


if TYPE_CHECKING:
    from litestar_saq.config import TaskQueues


class ThingController(Controller):
    """Handles the things."""

    @post(path="/do-thing")
    async def do_background_thing(self, task_queues: TaskQueues) -> Message:
        """Schedule a background thing."""

        queue = task_queues.get("thing-tasks")
        job = await queue.enqueue(
            "do_the_thing",
            data={
                "thing_id": "1235",
            },
            key="thing_id-1235",  # this ensures you can't run the same job with the same parameters at the same time.
            timeout=1200,
            ttl=43200,
            retries=0,
            # start it in 5 seconds
            scheduled=time() + 5,
        )
        if job:
            return Message(message="Successfully scheduled thing")
        return Message(message="Job is already running or couldn't schedule.")
#

Hopefully, this example helps you

teal imp
#

really appreciate it @sacred bough will follow up if i have any questions 🙏 🙏

sacred bough
#

Also, FWIW, i tend to create a couple of queues

#

Think about it like a "high priority" or "short running" tasks queues vs one to run something much longer

#

you don't want all of your available workers saturated on long-running tasks only

#

There is also an undocumented way to run the saq worker within the uvicorn loop

#

(it a little different than the run-all command)

sacred bough
teal imp
#

could one simply run more saq workers to alleviate the saturation of jobs?

sacred bough
#

yeah you could. But that might not solve your problem if you get a flood of jobs

#

so, I have a few different types of queues

#

depending on what the work is

#

like regular system stuff goes to one queue

teal imp
#

hmm how do the different queues solve that problem?

sacred bough
#

but long running data loads go to a different queue

#

each queue spawns in a separate process

#

with it's own concurrency limits

teal imp
#

do you have a different # of workers for each queue? (or could you?)

sacred bough
#

i think i would need to move it to the queue config

teal imp
#

yeah because what comes to mind with what I have worked with in the past is autoscaling workers based on queue depth stuff like that, so some sort of background task that checks the number of jobs and if it's over some threshold can spawn more workers or something like that

sacred bough
teal imp
#

re: more queues btw I assume that would just mean adding more QueueConfigs to the queue_configs=[] list?

sacred bough
#

yeah, exactly

#

and you can have certain queues run within the litestar event loop and others run outside

teal imp
#

have you seen a substantial throughput increase by increasing concurrency limits?

sacred bough
#

No, not really

#

but it's because of the types of jobs i'm running

#

I have lots of long running processes mostly at the moment

sly sierraBOT
#

litestar_saq/config.py line 214

separate_process: bool = True
teal imp
#

ah fair yeah I have a few things that do api calls/data processing which would take a long time too for each (hence the idea of spawning more workers to handle the increased jobs that come in)

sacred bough
#

if you se this to False, it will hook into the litesar on_startup/shutdown hooks to start the queue worker

#

but, i've noticed a huge benefit by splitting my jobs into many smaller jobs

#

that are then chained together

sly sierraBOT
#

examples/map.py line 30

result = await queue.apply(sum_of_squares.__name__, n=10000)
teal imp
#

very interesting thanks for this @sacred bough will probably have questions re: this once I get the rest wired back up from the "old way" to the new way, maybe can then hopefully contribute some stuff to the litestar-fullstack repo or documentation that can help others too (unless you do it before haha)

sacred bough
teal imp
sacred bough
#

It’s injected by the saq plugin

#

And it exposes all of the queues that you registered with the plugin configs

teal imp
#

ah interesting so this would then in theory obviate the need to inject the queue to any controller via a provide_queues via a Provide and just use that directly?

sacred bough