Sorry if this information is posted elsewhere or can be found in the documentation but previously within litestar-fullstack when using the lib implementations instead of plugins, queues were a standalone variable within the base
from __future__ import annotations
import asyncio
from collections import abc
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
import saq
from redis.asyncio import Redis
from app.lib import serialization, settings
if TYPE_CHECKING:
from signal import Signals
__all__ = ["Queue", "Worker", "WorkerFunction", "queues", "BackgroundTaskError", "CronJob"]
WorkerFunction = abc.Callable[..., abc.Awaitable[Any]]
@dataclass
class Job(saq.Job):
"""Job Details"""
job_name: str | None = None
job_description: str | None = None
@dataclass
class CronJob(saq.CronJob):
"""Cron Job Details"""
job_name: str | None = None
job_description: str | None = None
class BackgroundTaskError(Exception):
"""Base class for `Task` related exceptions."""
class Queue(saq.Queue):
"""[SAQ Queue](https://github.com/tobymao/saq/blob/master/saq/queue.py).
Configures `msgspec` for msgpack serialization/deserialization if not otherwise configured.
Parameters
----------
*args : Any
Passed through to `saq.Queue.__init__()`
**kwargs : Any
Passed through to `saq.Queue.__init__()`
"""
...
class Worker(saq.Worker):
"""Worker."""
# same issue: https://github.com/samuelcolvin/arq/issues/182
SIGNALS: list[Signals] = []
async def on_app_startup(self) -> None:
"""Attach the worker to the running event loop."""
loop = asyncio.get_running_loop()
loop.create_task(self.start())