#Advanced Alchemy
1 messages · Page 2 of 1
just add __bind_key__ = <key to use for the metadata> to the model
and it'll be pinned to a separate metadata automatically
I'll have to take a look at it. There may be some differences in my env.py, but I had those from a while ago. Also, alembic.ini references the script_location which is created by league manager, though that can probably be a little more dynamic.
Excellent! that's what that's for!
i use it for have some models added to one database and others to a second database
Makes sense.
i think if you already have metadata created, you can register it manually in the registry under a key. I've not tested this use case, but it should work
so if you added your existing metadata instead of letting the registry create a new one for you under the key foo, you could add __bind_key__ = "foo" to the table to make it automatically get assigned to that metadata
it's metadata_registry in advanced_alchemy.base
you would call the set method with the key you want to use and your existing metadata
and then just use that key on the models
ah interesting... I think I follow. To be honest, all of this stuff felt like it was in too deep when I started, but I think it's starting to congeal a bit. I believe this could work.
Hmm, I think this could work. It'd be good to remove the alembic template stuff from league manager if possible. I do think I reference some logging settings in alembic.ini that are not in AA, but it's not that deep.
you can register the same settins outside of the alembic ini. I can show you the logging facilities that i used to override the alembic logger
This is great news. In all, my desire/hope has been to have as few barriers as possible in order to have a straightforward orm experience (outside of django), which includes the migration story.
keeping the complexity in AA, and allowing customization through configuration is the way!
yep. totally agree.
And to be fair, I think it's pretty much there already. What I can do with league manager is already pretty great, and all the heavy lifting is in AA. Great job with the library!
I think it's low key a major feature of litestar (in terms of how it integrates), but really hard to get the point across without people using it or seeing it in action.
But also, being able to take the "orm" experience in a modular way, to a CLI app, or some other framework... that's pretty major!
yeah, i'd like to do some presentations or co-present at PyCon about this
showing how you can take the same CLI, repo, models, and services to any environment
(I submitted a tutorial proposal on this very thing--I may reach out if it gets accepted 😁)
I'll be there this year. Let's meet up regardless.
Yeah, for sure. I also plan to host an open space on it if the tutorial doesn't get selected.
Very cool to hear!
New release incoming soon
Full Flask support, sync and async. Here's a full CRUD example in Flask:
from __future__ import annotations
import os
from datetime import date # noqa: TC003
from uuid import UUID # noqa: TC003
from flask import Flask, request
from pydantic import BaseModel
from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from advanced_alchemy.extensions.flask import (
AdvancedAlchemy,
FlaskServiceMixin,
SQLAlchemySyncConfig,
base,
filters,
repository,
service,
)
class Author(base.UUIDBase):
"""Author model."""
name: Mapped[str]
dob: Mapped[date | None]
books: Mapped[list[Book]] = relationship(back_populates="author", lazy="noload")
class Book(base.UUIDAuditBase):
"""Book model."""
title: Mapped[str]
author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
author: Mapped[Author] = relationship(lazy="joined", innerjoin=True, viewonly=True)
class AuthorService(service.SQLAlchemySyncRepositoryService[Author], FlaskServiceMixin):
"""Author service."""
class Repo(repository.SQLAlchemySyncRepository[Author]):
"""Author repository."""
model_type = Author
repository_type = Repo
class AuthorSchema(BaseModel):
"""Author schema."""
id: UUID | None = None
name: str
dob: date | None = None
app = Flask(__name__)
config = SQLAlchemySyncConfig(connection_string="sqlite:///:memory:")
alchemy = AdvancedAlchemy(config, app)
@app.route("/authors", methods=["GET"])
def list_authors():
"""List authors with pagination."""
page, page_size = request.args.get("currentPage", 1, type=int), request.args.get("pageSize", 10, type=int)
limit_offset = filters.LimitOffset(limit=page_size, offset=page_size * (page - 1))
service = AuthorService(session=alchemy.get_session()) # pyright: ignore[reportArgumentType]
results, total = service.list_and_count(limit_offset)
response = service.to_schema(results, total, filters=[limit_offset], schema_type=AuthorSchema)
return service.jsonify(response)
@app.route("/authors", methods=["POST"])
def create_author():
"""Create a new author."""
service = AuthorService(session=alchemy.get_session()) # pyright: ignore[reportArgumentType]
obj = service.create(**request.get_json())
return service.jsonify(obj)
@app.route("/authors/<uuid:author_id>", methods=["GET"])
def get_author(author_id: UUID):
"""Get an existing author."""
service = AuthorService(session=alchemy.get_session(), load=[Author.books]) # pyright: ignore[reportArgumentType]
obj = service.get(author_id)
return service.jsonify(obj)
@app.route("/authors/<uuid:author_id>", methods=["PATCH"])
def update_author(author_id: UUID):
"""Update an author."""
service = AuthorService(session=alchemy.get_session(), load=[Author.books]) # pyright: ignore[reportArgumentType]
obj = service.update(**request.get_json(), item_id=author_id)
return service.jsonify(obj)
@app.route("/authors/<uuid:author_id>", methods=["DELETE"])
def delete_author(author_id: UUID):
"""Delete an author."""
service = AuthorService(session=alchemy.get_session()) # pyright: ignore[reportArgumentType]
service.delete(author_id)
return "", 204
You can run the same CLI as litestar by running:
uv run flask --app examples.flask.flask_services:app database --help
Lastly, there's a portals utility. This will optionally open up a new the thread running asyncio that can run indefinitely. Async events in flask can be executed from a sync context there and get the results back.
@opal spear can you confirm that the latest release still works with your typer api?
I’ve still not squashed the weird failures I’m getting for Flask only in GitHub CI so it’s not been merged yet.
ya
it's using the global click context now
instead of passing in the ctx parameter
Ah yup, there it is in help:
In other news, I created a litestar app and added my current version of league manager as a dependency... I was able to add my services implementation as a plugin.
nice, did you use the litestar-svcs plugin or roll your own?
Now my routes can look like this:
@get("/")
def get_user(
league_manager: Container,
) -> int:
season_service = league_manager.get(SeasonSyncService)
return str(season_service.count())
very clean
Yeah, I was stoked when I got it working!
I tried using litestar-svcs, but I think there was a bug somewhere...
we can get it patched if there are updates that need to be done
I think it's here:
app_config.signature_namespace.update(
{
"Container": Container,
"Registry": Registry,
},
)
For some reason, that namespace thing was giving some typing error...
but also, I realized that I could probably just use my exisitng "LeagueManagerRegistry()" within its own class using the InitPluginProtocol. I looked at litestar-svcs for reference, but this way I could keep it a bit more focused on league manager. It's fairly straightforward.
I'll likely throw up the code later. I'm just building out the litestar app a little more.
what was the error you got?
let me see if I can recreate it
File "/home/shoden/py/codeberg/newsl/.venv/lib/python3.12/site-packages/litestar/routes/http.py", line 133, in _get_response_for_request
return await self._call_handler_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/shoden/py/codeberg/newsl/.venv/lib/python3.12/site-packages/litestar/routes/http.py", line 153, in _call_handler_function
response_data, cleanup_group = await self._get_response_data(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/shoden/py/codeberg/newsl/.venv/lib/python3.12/site-packages/litestar/routes/http.py", line 184, in _get_response_data
parsed_kwargs = route_handler.signature_model.parse_values_from_connection_kwargs(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/shoden/py/codeberg/newsl/.venv/lib/python3.12/site-packages/litestar/_signature/model.py", line 205, in parse_values_from_connection_kwargs
return convert(kwargs, cls, strict=False, dec_hook=deserializer, str_keys=True).to_dict()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: Type unions may not contain more than one custom type - type `typing.Union[collections.abc.Callable, typing.Awaitable]` is not supported
hmm, actually, I just commented out the lines:
app_config.signature_namespace.update(
{
"Container": Container,
"Registry": Registry,
},
)
and that seems to make it work 🤔
I suppose I can use litestar_svcs ... it took me a while last night to narrow it down to those config options
Dependency(skip_validation=True), # noqa: B008
This is handled by the plugin
async def provide_container(
self,
svcs_registry: Registry = Dependency(skip_validation=True), # noqa: B008
) -> AsyncGenerator[Container, None]:
"""Provide the container from the given registry.
Args:
----
svcs_registry: A `svcs.Registry` instance.
"""
async with Container(svcs_registry) as container:
yield container
Yeah, I am using the litestar_svcs plugin now, and I see that line under provide container... but I still get the error if the namespace is updated.
@sterile thistle I've got a fix for this issue you raised: https://github.com/litestar-org/advanced-alchemy/issues/261
it is a breaking change though
Description
class SQLAlchemyAsyncRepositoryReadService(Generic[ModelT], ResultConverter):
"""Service object that operates on a repository object."""
repository_type: type[SQLAlchemyAsyncRepositoryProtocol[ModelT] | SQLAlchemyAsyncSlugRepositoryProtocol[ModelT]]
Firstly repository_type is uninitialized. This automatically raises an issue if strict.
But more importantly when you do something like this: Which happens in the litestar fullstack example
class UserService(SQLAlchemyAsyncRepositoryService[User]):
"""Handles database operations for users."""
repository_type = UserRepository
And then try to do
self.repository: UserRepository = self.repository_type(**repo_kwargs)
Typechecker complains that Type "SQLAlchemyAsyncRepositoryProtocol[User] | SQLAlchemyAsyncSlugRepositoryProtocol[User]" is not assignable to type "UserRepository"
URL to code causing the issue
No response
MCVE
This is easier to see with a completely synthetic example
class ClassA:
"""Class A."""
class ClassB:
"""Class B."""
class ClassSA(ClassA):
"""Class SA"""
class ClassR:
"""Class R."""
ctype: type[ClassA] | type[ClassB] = ClassA
class ClassSAR(ClassR):
ctype = ClassSA
def __init__(self) -> None:
super().__init__()
self.c: ClassSA = self.ctype()
The type checker complains that Type "ClassA | ClassB" is not assignable to type "ClassSA"
This might be a bug in pyright. But I think the behavior does make sense because at runtime ctype() could still return ClassB. So a Cast is needed
Steps to reproduce
Run the MCVE
Screenshots
"In the format of: "
Logs
No response
Package Version
0.20.0
Platform
- ✅ Linux
- ◻️ Mac
- ◻️ Windows
- ◻️ Other (Please specify in the description above)
accepted, bug, help wanted
I'd like to do it sooner than later
class SlugBookSyncService(SQLAlchemySyncRepositoryService[BigIntSlugBook, SlugBookSyncRepository]):
"""Book repository."""
repository_type = SlugBookSyncRepository
match_fields = ["title"]
def to_model(
self,
data: ModelDictT[BigIntSlugBook],
operation: str | None = None,
) -> BigIntSlugBook:
data = schema_dump(data)
if is_dict_without_field(data, "slug") and operation == "create":
data["slug"] = self.repository.get_available_slug(data["title"])
if is_dict_without_field(data, "slug") and is_dict_with_field(data, "title") and operation == "update":
data["slug"] = self.repository.get_available_slug(data["title"])
return super().to_model(data, operation)
basically it's the same but you need to pass in a Repository as a class parmater
but you now have correct typing for the repo and the model
Also, this will get added into AA soon, but if you want a factory to generate the "provides" for DI'ing services
ServiceT_co = TypeVar("ServiceT_co", bound=SQLAlchemyAsyncRepositoryService[Any], covariant=True)
def create_service_provider(
service_class: type[ServiceT_co],
/,
statement: Select[Any] | None = None,
config: SQLAlchemyAsyncConfig | None = c.alchemy,
error_messages: ErrorMessages | None | EmptyType = Empty,
load: LoadSpec | None = None,
execution_options: dict[str, Any] | None = None,
) -> Callable[..., AsyncGenerator[ServiceT_co, None]]:
"""Create a dependency provider for a service."""
async def provide_service(
db_session: AsyncSession | None = None,
) -> AsyncGenerator[ServiceT_co, None]:
async with service_class.new(
session=db_session,
statement=statement,
config=config,
error_messages=error_messages,
load=load,
execution_options=execution_options,
) as service:
yield service
return provide_service
used like so:
dependencies = {
"teams_service": create_service_provider(TeamService, load=[m.Team.tags, m.Team.members]),
}
It's actually not breaking. I didn't realize I'm able to set a default for a Generic class type var.
PR for timezone migration bug with new alembic version
Description
Disabled timezone in alembic.ini to fix alembic.util.exc.CommandError: Can't locate timezone: UTC error while applying migrations
Reference:
https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
Closes
Triage Required :hospital:, area/alembic, pr/external, size: small
Just do leave this here as an example of the new methods on the service. Instead of overriding the create and update methods you can just do this:
class UserService(SQLAlchemyAsyncRepositoryService[User, UserRepository]):
"""Handles database operations for users."""
repository_type = UserRepository
default_role = constants.DEFAULT_USER_ROLE
async def to_model_on_create(self, data: ModelDictT[User]) -> ModelDictT[User]:
return await self._add_role_to_user(data)
async def to_model_on_update(self, data: ModelDictT[User]) -> ModelDictT[User]:
return await self._add_role_to_user(data)
async def _add_role_to_user(self, data: ModelDictT[User]) -> ModelDictT[User]:
if is_dict(data):
role_id: UUID | None = data.pop("role_id", None)
data = await self.to_model(data)
if role_id:
data.roles.append(UserRole(role_id=role_id, assigned_at=datetime.now(timezone.utc))) # noqa: UP017
return data
So, i've been able to get advanced alchemy extended into the fastapi CLI
there's just a slight snag though because there's no way to inject the asgi app path into the fastapi CLI before the run/dev command
you basically have to do this:
if __name__ == "__main__":
"""Launches the FastAPI CLI with the database commands registered"""
from fastapi_cli.cli import app as fastapi_cli_app # pyright: ignore[reportUnknownVariableType]
from typer.main import get_group
from advanced_alchemy.extensions.fastapi.cli import register_database_commands
click_app = get_group(fastapi_cli_app) # pyright: ignore[reportUnknownArgumentType]
click_app.add_command(register_database_commands(app))
click_app()
i think this is "good enough" until there's a more formal way to extend the fastapi cli
Not sure if it's related to AA, but maybe someone faced this? I do an integration test and the coverage is incomplete.
@get("/")
async def index(db_session: AsyncSession) -> Template:
message_service = MessageService(session=db_session)
messages = await message_service.list()
return Template(template_name="index.html", context={"messages": messages})
async def test_index(test_client: AsyncClient):
response = await test_client.get("/")
assert response.status_code == HTTP_200_OK
pytest --cov=app tests
coverage html -d .htmlcov
You're likely missing the correct concurrency configuration for coverage
This is my default when working with async + SQLA:
[tool.coverage.run]
branch = true
concurrency = ["thread", "greenlet"]
Wow, that worked, thanks!
@buoyant burrow How does it work? Can you point to the docs maybe?
As I see, it just works
I've updated the fullstack app to use a few of the new possible service patterns. Take a look a the project when you have some time and interest.
class RoleService(SQLAlchemyAsyncRepositoryService[m.Role]):
"""Handles database operations for users."""
class Repository(SQLAlchemyAsyncSlugRepository[m.Role]):
"""User SQLAlchemy Repository."""
model_type = m.Role
repository_type = Repository
match_fields = ["name"]
async def to_model_on_create(self, data: ModelDictT[m.Role]) -> ModelDictT[m.Role]:
data = schema_dump(data)
if is_dict_without_field(data, "slug"):
data["slug"] = await self.repository.get_available_slug(data["name"])
return data
async def to_model_on_update(self, data: ModelDictT[m.Role]) -> ModelDictT[m.Role]:
data = schema_dump(data)
if is_dict_without_field(data, "slug") and is_dict_with_field(data, "name"):
data["slug"] = await self.repository.get_available_slug(data["name"])
return data
as a simple example of the updates
Here's the updated User service: https://github.com/litestar-org/litestar-fullstack/blob/main/src/app/domain/accounts/services.py#L26
Hey @marsh jacinth I know I showed you a couple of relationship filters I was playing with a while back, and you showed some interest. I think Alc had some concerns though. Curious if that ever went anywhere? I recently updated them for the latest advanced_alchemy and was curious if it was something that might end up being upstreamed in some form. This is my latest iteration:
@dataclass
class RelationshipFilter(StatementFilter):
relationship_name: str
field_name: str
value: str | None
def append_to_statement(
self,
statement: Select[tuple[ModelT]],
model: type[ModelT],
) -> Select[tuple[ModelT]]:
relationship = self._get_relationship_attr(model, self.relationship_name)
if self.value is None:
return statement
filter_condition = self._build_filter_condition(relationship)
if relationship.property.uselist:
# One-to-Many relationship
return statement.where(relationship.any(filter_condition))
# Many-to-One or One-to-One relationship
return statement.where(relationship.has(filter_condition))
def _build_filter_condition(
self,
relationship: InstrumentedAttribute[any],
) -> BinaryExpression:
related_model = relationship.property.mapper.class_
field = getattr(related_model, self.field_name)
return field.ilike(f"%{self.value}%")
@staticmethod
def _get_relationship_attr(
model: any,
key: str | InstrumentedAttribute[any],
) -> InstrumentedAttribute[any]:
attr = StatementFilter._get_instrumented_attr(model, key)
if not isinstance(attr.property, RelationshipProperty):
raise ValueError(f"{key} is not a relationship attribute")
return attr
I've not had a chance to finish up those filters. The last thing i did was add a dynamic filter generator to fullstack so that each endpoint/controller can have it's own set of filters configured.
This looks good though
let me think about this a bit. I don't think we even have to have a custom filter for this
we can just detect if the field i a relationship
this is helpful to see though. Let me think about the best way to integrate this. Maybe it's just to get you to add this as a PR as is.
Happy to add a PR if that works. I've also got one for association relationship patterns which seems to work well but obviously gets a bit more complex
I'll open an issue in advanced_alchemy with the details of both, means it's not cluttering up this chat
thanks
@distant portal re: your comment on database create and drop, I'm totally in favor of it if we can add it in a generic way. Take a look at sqlalchemy_utils for some inspiration. I think they've got something similar built in.
nice will do 🙂
This looks cool. I just saw that you added commits in the fullstack example and not the inertia one. And it generally saw more development as well.
Is it just a random thing?
yeah, it's just an ADD thing. Something got my attention before finishing the change everywhere. I'll be queuing up some edits over there shortly
I'd like to finish the team portal and create a railway instance for inertia
Hey guys so I was working on the create and drop commands, got a draft pr up here: https://github.com/litestar-org/advanced-alchemy/pull/371 , and noticed that there's not a synchronous mysql driver that gets tested, any reason in particular for this?
Description
This code adds two generic database commands, create and drop for creating an new database and dropping an existing one respectively. It was briefly discussed some here: #365 . A lot of the code was heavily inspired from SQLAlechemy Utils database.py and adapted for the advanced-alchemy use case.
Currently, it still needs integration testing, so far I've only tested it locally with sqlite.
Closes
Triage Required :hospital:, pr/external, size: small, type/feat
At the time i first wrote it, there wasn't a good sync/async alternative. I think we need to add in the official mysql driver support now that it support sync and async similar to psycopg
It's a bit of a pain to add in all the stuff for a new engine, but if you want to add tests for it. We can add additional support. It may be easier to wait for the test refactor. I've been waiting for that before adding in BQ support.
just a quick suggestion, you dont have to handle sqla 1.4
nice 🙂
wouldnt it make sense to have you changes in alembic/utils.py instead of creating a new file? just want to know why you feel its better to stay in a different module 🙂
Well, thats where I started off putting it but then I realized that technically it has nothing to do with alembic
we could just rename alembic to migrations?
true, we could move that into this as well
yes
sorry that I keep sending these disjoint reviews
but if you have the time and want to, I would like it if you can find a way to remove / hide the branches with the dialect somehow
if anyone wants to add new dialects that could make things easier, but for the time being I am OK with if else
yea good point, I'll see if I can figure out a cleaner way to handle that
Hey @versed carbon if you get a chance take a look, I updated the code to use an adapter. So we can have an sqlite adapter, a postgres adapter, etc and if there's not an adapter for your dialect we can show an error message. Based on the tests refactoring and renaming, maybe it would make more sense to get the interface right for the adapters, and just add the ones for sqlite and postgres first and then wait until the test refactor to add the other ones. What do yall think?
You basically have to implement 4 functions on the adapter: create, create_async, drop, and drop_async and state which drivers it supports.
heya, thanks 🙂
I will give it a review and drop any comments (if any)
like the adapater btw
Hello,
As we are getting close to 1.0 for AA, there is an issue regarding the release, feel free to participate !
I tried doing STI stuff today and ran into several issues, has anyone tried using advanced alchemy for STI use cases?
Can't quite figure out where it is being injected, but the changelogs are linking to issues in the litestar repo instead of aa's repo 😅
I don't think that's intentional 😄
it's definitely not intentional. It looks like it's just on the hosted doc page, so it's likely a doc generation issue.
You mean the preview?
Ah
I know where the issue come from
It's in the sphinx ext
I think that one was just copy+pasted over from Litestar
yep
go for it, if you don't mind
aha, I didn't grep hard enough 🙂
np, was just clicking around and noticed it 🙂
after staring at the same projects for so many hours, all the text just runs together. I think we have this same issue in a few other projects where we've copied the docs structure from litestar proper
The latest AA release has a change to the plugin config that causes the users plugin to break, since we're grabbing the AA plugin's config in order to provide SQLA sessions
https://github.com/litestar-org/advanced-alchemy/commit/7775dbf5c16ba8497cfc993df1ca1109d632e143#diff-28b8aa60560e171249234af327a5494ccc737daf6dd0ed98d3b6fdc4d4bd80d0R31
I think I know where this is coming from though, this is to support multiple engines/databases?
Perhaps I should change the users config to accept an instance of the AA config directly, instead of trying to infer it. That way, someone could have both n amount of engines and specify exactly which one should be responsible for users
Removes future annotations from the main code base. Users should still feel free to use in their own code.
This should help resolve type hinting issues such as import datetime and `from datetim...
My code was written by a clown and accesses the private SQLAlchemyPlugin._config attribute, so that's on me.
Though the public property might not have existed at the time
i don't think it did exist at the time. There should be a config now
does it fix once you move to that?
i made the config always return a list instead of a list or a single instance
yes, AA supports multiple databse engines
you can mix and match and driver type and async/sync into a single config
I believe that would solve it, but I would naively have to use the config at index zero, which is why I'm thinking of changing things up
This suffices as a hotfix, there wasn't support for multiple engines anyway - so no user impact
I'll tidy things up later
when you are ready to add in multi-engine support, you are likely just going to need to add a session_dependency_key to your configs that tell your code what engine/session to use
That's a great solution I didn't consider
That won't work everywhere though since we can't rely on DI within middleware
I'm not sure I'm following. There's a provide_session you can call to get the active session. I'll double check, but i think it accepts a bind key. It's been a bit since I last looked at that.
I'll go double check the API documentation - odds are that things have changed quite a bit since last time
Bit behind the times to update AA, so was on the way from 0.27 -> 1.0. Looks like in 0.31, the de-future-annotations change you made, though, @marsh jacinth, seems to break every CollectionFilter use I have in my code with this error:
NameError: name 'abc' is not defined. Did you mean: 'abs'? Or did you forget to import 'abc'?
It was as simple to fix as moving the from collections import abc from the TYPE_CHECKING block into the standard import block.
Before I go and make an issue and MCVE about this, couple questions:
- has this come up since 0.31's release at all that you know of? Seems weird that this could've gone this long without being noticed if it completely broke the CollectionFilter types, and I so far have only been able to repro this when using AA with Litestar rather than using it directly.
- If this is specific to Litestar, you'd (probably?) think having the relevant import in the module(s) in which CollectionFilter is actually used should resolve this. Should this be considered an AA bug or a Litestar bug?
You’ve been the only person reporting this. I use collection filters quite extensively as well.
Can you share what your code looks like?
Will you replace your filters.py with this and let me know if the outcome changes?
And if that doesn't work, try this:
No difference in this case
And a different error from this one:
ImportError while loading conftest '/home/jordan/Projects/athenaeum_core/test/conftest.py'.
test/conftest.py:18: in <module>
from src.database.database import PgUuidBase
src/database/__init__.py:1: in <module>
from src.database.database import ( # noqa: F401
src/database/database.py:10: in <module>
from advanced_alchemy.base import CommonTableAttributes
.venv/lib64/python3.13/site-packages/advanced_alchemy/__init__.py:1: in <module>
from advanced_alchemy import (
.venv/lib64/python3.13/site-packages/advanced_alchemy/filters.py:434: in <module>
class SearchFilter(StatementFilter):
.venv/lib64/python3.13/site-packages/advanced_alchemy/filters.py:470: in SearchFilter
def _func(self) -> attrgetter[Callable[[str], BinaryExpression[bool]]]:
E TypeError: type 'operator.attrgetter' is not subscriptable
Just a sec and I'll give some detail on what's going on overall
A sec is a bit longer when in typical rural fashion the internet disappears on you...
So here's an example (happens for all of the relevant list endpoints that use a collection filter) - pretty basic Litestar + AA combination I think. I use some custom filters here but commenting them all out
@get(
"/audit",
summary="List audit log entries",
description="""
List audit log entries, optionally filtered.
""",
guards=[
provide_auditlog_read_guard,
],
dependencies={
"audit_repository": Provide(provide_audit_repo),
"user_id_filter": Provide(provide_audit_user_id_filter),
"endpoint_filter": Provide(provide_audit_endpoint_filter),
"method_filter": Provide(provide_audit_method_filter),
"status_filter": Provide(provide_audit_status_filter),
"ip_address_filter": Provide(provide_audit_ip_address_filter),
"user_agent_filter": Provide(provide_audit_user_agent_filter),
},
)
async def list_audit_logs(
self,
audit_repository: AuditLogRepository,
user_id_filter: SearchFilter,
endpoint_filter: SearchFilter,
method_filter: CollectionFilter,
status_filter: CollectionFilter,
ip_address_filter: SearchFilter,
user_agent_filter: SearchFilter,
limit_offset: LimitOffset,
) -> OffsetPagination[AuditLogBasicResponse]:
results, total = await audit_repository.list_and_count(
user_id_filter,
endpoint_filter,
method_filter,
status_filter,
ip_address_filter,
user_agent_filter,
limit_offset,
)
return OffsetPagination(
items=[AuditLogBasicResponse.model_validate(result) for result in results],
total=total,
limit=limit_offset.limit,
offset=limit_offset.offset,
)
And an example of one of the filters being applied:
async def provide_audit_method_filter(
method_search: list[str] | None = Parameter(
query="method",
default=None,
required=False,
),
) -> CollectionFilter:
return CollectionFilter(
field_name="method",
values=method_search,
)
That ends up in this stack trace which still makes me think that Litestar/Msgspec is trying to coerce that abc type which is somehow or other not imported at the time of coercion. Doesn't matter whether I import it in my code anywhere, because I guess it's only in the import cache and not actually available in the module when that coercion happens
File "/home/jordan/Projects/athenaeum_core/.venv/lib64/python3.13/site-packages/litestar/_signature/model.py", line 205, in parse_values_from_connection_kwargs
return convert(kwargs, cls, strict=False, dec_hook=deserializer, str_keys=True).to_dict()
~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jordan/Projects/athenaeum_core/.venv/lib64/python3.13/site-packages/msgspec/_utils.py", line 238, in get_dataclass_info
hints = get_class_annotations(obj)
File "/home/jordan/Projects/athenaeum_core/.venv/lib64/python3.13/site-packages/msgspec/_utils.py", line 160, in get_class_annotations
value = _eval_type(value, cls_locals, cls_globals)
File "/home/jordan/Projects/athenaeum_core/.venv/lib64/python3.13/site-packages/msgspec/_utils.py", line 47, in _eval_type
return typing._eval_type(t, globalns, localns, ())
~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.13/typing.py", line 474, in _eval_type
return t._evaluate(globalns, localns, type_params, recursive_guard=recursive_guard)
~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.13/typing.py", line 1081, in _evaluate
eval(self.__forward_code__, globalns, localns),
~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<string>", line 1, in <module>
NameError: name 'abc' is not defined. Did you mean: 'abs'? Or did you forget to import 'abc'?
I did also wonder if perhaps my usage of future annotations might be messing with something here
no, i removed it from the library but it's fine to use in your own code
collection filter does take a generic arg
though
CollectionFilter[str]
which is the type for the return
in the collection
Lemme try that, 1 sec
No difference with or without the generic typing
It makes sense that it's only an issue for the Collection/NotInCollection filters though
Since they're the only two with any typing relevant to abc
yeah, not sure why i'm not seeing the issue though.
Oh, wonder if I can repro this with the datetime filters though
i'm going to create a branch for you to try if thats ok?
Yeah sure
heh yeah - same thing with OnBeforeAfter:
NameError: name 'datetime' is not defined. Did you forget to import 'datetime'?
do you have anything in your signature_namespace?
Not unless I've done something inadvertently
i'm guessing this is going to just make things work for you
but i'm not sure why you need it
ooops
you probably have a different error now
i see it in the CI
That brings this error back, doesn't like the attrgetter it seems
I just rolled back that attrgetter change
if you upgrade and try again
CI is running so i don't know if it fixed it yet
Yep, sure enough that fixes it
datetime.datetime has a similar issue
where if you use from datetime import datetime
add that to your signature_namespace
and then use import datetime in your code
basically when you have something in a type checking block, litestar will try to resolve it correctly at runtime. signature_namespace and signature_types let you give it "hints" (or more like directly tell it how to resolve certain type hints)
Yeah, which is why I could repro that with the OnBeforeAfter filter too
i wonder if you added signature_namespace={"abc": abc} to your litestar app object if that would fix the version you had issues with
I'll give it a shot, bear with me
no rush. thanks. I'll also get this fix merged.
filters generally have an impact on the openapi schema so it's probably better to not have that file do any "if TYPECHECKING"
It looks like the signature_namespace change doesn't make a difference
Really bizarre nobody has reported this, though, if it's not specifically related to something I'm doing in my code
Thank you for your input though Cofin! Brilliant help as always
glad we got it sorted. I'll get this in the next release
Sounds good - I'll roll back to 0.30.0 for now and keep an eye for the update
Cheers mate
So for the upcoming storage type integration, I'm thinking of adding the following:
- instead of using sqlalchemy events like after commit, i plan to hook the upload and deletion of files into the repository
so that if you enable a flag that turns on "file handling"
it auto-uploads and deletes when the files attached to the model are changed
Here's an example of what it'll look like
class Document(Base):
__tablename__ = "documents"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50))
# Single file storage
attachment: Mapped[Optional[FileObject]] = mapped_column(
StoredObject(backend="local_test_store"), # Use StoredObject wrapper
nullable=True,
)
# Multiple file storage
images: Mapped[Optional[FileObjectList]] = mapped_column(
StoredObject(backend="local_test_store", multiple=True), # Use StoredObject wrapper
nullable=True,
)
obj1 = FileObject(filename="image1.jpg", target_filename=img1_path, content_type="image/jpeg")
obj1_updated = await backend.save_object_async(obj1, img1_content)
obj2 = FileObject(filename="image2.png", target_filename=img2_path, content_type="image/png")
obj2_updated = await backend.save_object_async(obj2, img2_content)
img_list = MutableList[FileObject]([obj1_updated, obj2_updated])
doc = Document(name=doc_name, images=img_list)
This is any example where you attach multiple files to a single row.
(this is the manual file management without the automated repo hooks)
It has obstore and fsspec integration to start with
and you can register the filsystem into a registry object to create multiple filesystems
storages.register_backend(ObstoreBackend(fs=MemoryStore(), key="memory"))
storages.register_backend(
ObstoreBackend(
fs=LocalStore(prefix=Path(tmp_path / "file_object_test_storage"), automatic_cleanup=False, mkdir=True), # pyright: ignore
key="local_test_store",
)
)
this registeres a key called memory to a Memory Store
and local_test_store to a Local filesystem
i'm thinking of releasing this before i do the repo integration though just so that we can get this out in the field and let people so what they like and don't like
https://github.com/litestar-org/advanced-alchemy/issues/456 @marsh jacinth
I am going back to my Litestar based project and I updated dependencies to the latest versions to bring my code up to speed. Apart from some issues with subclassing I am stuck to a change in AA:
from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
class Repository(Repository[Admin]):
uniquify = True
model_type = Admin
class Service(SQLAlchemyAsyncRepositoryService[ModelT]):
pass
class TaggedService(Service[ModelT]):
...
class Service(TaggedService[Admin]):
repository_type = Repository
...
async def to_model(self, data: ModelDictT[Admin], operation: str | None = None) -> Admin:
causes
async def to_model(self, data: ModelDictT[Admin], operation: str | None = None) -> Admin:
~~~~~~~~~~^^^^^^^
TypeError: string indices must be integers, not 'DeclarativeAttributeIntercept'
I'm not sure I understand what changed here, I was using AA 0.25.0, now I'm on 1.4.0
What's your repo look like?
what do you mean?
SQLAlchemyAsyncRepositoryService[ModelT]
sorry I thought you meant my git repo 😄
you should have a SQLAlchemyAsyncRepository?
class Repository(SQLAlchemyAsyncRepository[ModelT]):
uniquify = True
pass
sorry, I missed an important part, I'll edit above
updated question post, added proper definition of Repository and Service
Service requires 2 arguments if you don't define the repo in-line
(it's condensed because I have my own hierarchy with overlapping names)
so repository_type = Repository
is obsolete?
class Service(SQLAlchemyAsyncRepositoryService[Admin, Repository]):
repository_type = Repository
or
class Service(SQLAlchemyAsyncRepositoryService[Admin]):
class Repository(SQLAlchemyAsyncRepository[Admin]):
uniquify = True
model_type = Admin
repository_type = Repository
the second option is what I do, with the difference Repository is not defined within Service
Also, try this: #1339666444580945931 message
from future import annotations
seems to have fixed it, thanks
for some reason I didn't find that post when looking for ModelDictT
(as always, the best part of Litestar is the people!)
we appreciate that! glad we got it worked out quickly
Hey @marsh jacinth a while back I mentioned a couple custom repository filters I was using for relationships/association objects. At the time the discussion was put on hold (I think Alc had some concerns?). Just wondering if this is something you'd want to circle back around to? If so I can look at making a PR for it in the near future
Can you remind me what you were thinking specifically? We may be in a better spot to do this now.
These were the two I think I showed you last time, more or less no different to then:
@dataclass
class RelationshipFilter(StatementFilter):
relationship_name: str
field_name: str
value: str | None
def append_to_statement(
self,
statement: Select[tuple[ModelT]],
model: type[ModelT],
) -> Select[tuple[ModelT]]:
relationship = self._get_relationship_attr(model, self.relationship_name)
if self.value is None:
return statement
filter_condition = self._build_filter_condition(relationship)
if relationship.property.uselist:
# One-to-Many relationship
return statement.where(relationship.any(filter_condition))
# Many-to-One or One-to-One relationship
return statement.where(relationship.has(filter_condition))
def _build_filter_condition(
self,
relationship: InstrumentedAttribute[Any],
) -> BinaryExpression:
related_model = relationship.property.mapper.class_
field = getattr(related_model, self.field_name)
return field.ilike(f"%{self.value}%")
@staticmethod
def _get_relationship_attr(
model: type[ModelT],
key: str | InstrumentedAttribute[Any],
) -> InstrumentedAttribute[Any]:
attr = StatementFilter._get_instrumented_attr(model, key)
if not isinstance(attr.property, RelationshipProperty):
raise ValueError(f"{key} is not a relationship attribute")
return attr
and
@dataclass
class AssociationObjectFilter(StatementFilter):
relationship_name: str
association_remote_relationship: str
remote_field_name: str
remote_value: str | None
association_field: str | None = None
association_value: str | None = None
def append_to_statement(
self,
statement: Select[tuple[ModelT]],
model: type[ModelT],
) -> Select[tuple[ModelT]]:
relationship = self._get_relationship_attr(model, self.relationship_name)
if self.remote_value is None and (
self.association_value is None or self.association_field is None
):
return statement
filter_condition = self._build_filter_condition(relationship)
return statement.where(relationship.any(filter_condition))
def _build_filter_condition(
self,
relationship: InstrumentedAttribute[Any],
) -> ColumnElement[bool]:
conditions = []
association_class = relationship.property.mapper.class_
remote_relationship = getattr(
association_class,
self.association_remote_relationship,
)
remote_class = remote_relationship.property.mapper.class_
if self.remote_value is not None:
remote_field = getattr(remote_class, self.remote_field_name)
conditions.append(
remote_relationship.has(remote_field.ilike(f"%{self.remote_value}%")),
)
if self.association_field and self.association_value is not None:
association_attr = getattr(association_class, self.association_field)
conditions.append(association_attr == self.association_value)
return and_(true(), *conditions)
@staticmethod
def _get_relationship_attr(
model: Any,
key: str | InstrumentedAttribute[Any],
) -> InstrumentedAttribute[Any]:
attr = StatementFilter._get_instrumented_attr(model, key)
if not isinstance(attr.property, RelationshipProperty):
raise ValueError(f"{key} is not a relationship attribute")
return attr
Just a heads up.... we need to pin Alembic to less than 1.16
1.16 came out this week and it seems to have breaking changes
I feel like this should have been a major version
the alembic.ini is optional
now
maybe it's not introducing any breaking changes when used through the standard cli. i'll have to do some more testing tomorrow though
I'm faced with
sqlalchemy.exc.InvalidRequestError: The unique() method must be invoked on this Result, as it contains results that include joined eager loads against collections
but all my Repositories have
uniquify = True
As the code used to work I suppose I'm missing some change to adapt to. Running on AA 1.4.4
you may have hit a bug. if you pass in uniquify on the method call itself, what happens?
if that works, can you show me a bit more about what your class looks like?
as in
await service.get(item_id=call_id, load=loadopts, uniquify=True)?
yep
lo and behold, it worked
ok, so i thought i'd fixed this, but i guess i missed a spot
there's an issue that was opened and i thought i'd implemented a fix for
my case has 5 joinedloads and one of them is composite
it feels like is in the latter
in the controller for the Call entity I have
loadopts = [
Call.tags,
joinedload(Call.function),
joinedload(Call.category),
joinedload(Call.backend_callback),
joinedload(Call.frontend_callback),
joinedload(Call.ios).joinedload(IO.dataset)
]
this one might be the issue
joinedload(Call.ios).joinedload(IO.dataset)
yeah, but if you set uniquify on the class, it should use that
how are you setting it exactly?
(when it didn't work)
in all classes like this:
class Repository(Repository[Call]):
model_type = Call
uniquify = True
class Service(TaggedService[Call]):
tag_class = Tag
repository_type = Repository
(some are TaggedServices, some are not)
do you have a .new call somewhere like in a dependency?
if so, would you mind setting it there to see if the issue is isolated to overriding the class level setting or if the .new will override it globally as well
that'll help me narrow down the specific instance that's not getting respected
no I don't have a .new call and I'm not sure I understand what you mean
yes, the errors are in the gets tho
the entities are created via service.create(Entity())
uniquify=True works for service.get but not for service.list
but I'm passing a statement to the list and and an item_id to the get
how are you creating the service instance though? I’d like to have you try passing in the uniquify on the instance creation to see if that changes things globally.
def provides_service(db_session: AsyncSession) -> Service:
"""Constructs repository and service objects for the request."""
return Service(session=db_session,order_by=(Call.scheduled,False))
then provides_service is injected as dependency
should I add uniquify=True there?
yep, please try
got a different error...
AttributeError: 'NoneType' object has no attribute 'name'
but this might be due to a more recent change I introduced
unless you can make sense of it
I wouldn't expect that error from it
are you using this in Litestar, FastAPI, or neither?
Litestar
ok, here's a tip you can use then in that case
you don't need to create the provides_service for each model any more.
you can call the create_service_provider(<service name>, load=..., uniquify=True, etc.. https://github.com/litestar-org/litestar-fullstack/blob/main/src/app/domain/accounts/deps.py#L17
Also, there's a caveat with order by
if you plan to use filters on the frontend, you probably don't want to use order_by on the service call
it's best to define the default orer by as a filter. This will automatically be injected into your route.
if you know you'll only be using the service on the controller or route, you can declare the whole thing in one call like this: https://github.com/litestar-org/litestar-fullstack/blob/main/src/app/domain/teams/controllers/teams.py#L30-L35
this is unrelated to your issue, but it may help you in the future
intersting. was this always available? I built the bulk of this app a year ago
no, this came out just before we hit 1.0 i think. there was a flood of usability tweaks that came in around then, i think
is there a way to debug where in the DTO handling that exception is produced?
I have this
2025-06-10 16:41:47,611 loglevel=ERROR logger=litestar _default_exception_logging_handler() L146 Uncaught exception (connection_type=http, path=/v1/calls):
Traceback (most recent call last):
File "/usr/local/lib/python3.12/dist-packages/litestar/middleware/_internal/exceptions/middleware.py", line 158, in __call__
await self.app(scope, receive, capture_response_started)
File "/usr/local/lib/python3.12/dist-packages/litestar/routes/http.py", line 81, in handle
response = await self._get_response_for_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/litestar/routes/http.py", line 133, in _get_response_for_request
return await self._call_handler_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/litestar/routes/http.py", line 184, in _call_handler_function
return await route_handler.to_response(app=scope["litestar_app"], data=response_data, request=request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/litestar/handlers/http_handlers/base.py", line 581, in to_response
data = return_dto_type(request).data_to_encodable_type(data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/litestar/dto/base_dto.py", line 123, in data_to_encodable_type
return backend.encode_data(data)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/litestar/dto/_backend.py", line 366, in encode_data
_transfer_data(
File "/usr/local/lib/python3.12/dist-packages/litestar/dto/_backend.py", line 596, in _transfer_data
return field_definition.instantiable_origin(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/litestar/dto/_backend.py", line 597, in <genexpr>
_transfer_data(
File "/usr/local/lib/python3.12/dist-packages/litestar/dto/_backend.py", line 622, in _transfer_data
return _transfer_instance_data(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/litestar/dto/_backend.py", line 673, in _transfer_instance_data
unstructured_data[field_definition.name] = _transfer_type_data(
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/litestar/dto/_backend.py", line 699, in _transfer_type_data
return _transfer_instance_data(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/litestar/dto/_backend.py", line 667, in _transfer_instance_data
else attribute_accessor(source_instance, field_definition.name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'name'
do you have one of your types on the model set to required when you have the field as nullable?
I have a nullable foreign key
which by chance is NULL for all the records at the moment as it's a new addition
what's your type hint on this column?
but I'm not loading that entity in this query
category_id: Mapped[UUID] = mapped_column(ForeignKey("category.id"),index=True, nullable=True)
And what does your DTO look like?
category_id: Mapped[Optional[UUID]] = mapped_column(ForeignKey("category.id"),index=True, nullable=True)
what's the relationship look like?
it excludes the category_id AND the mapped entity
Mind sharing a bit more code though?
no, actually it only excludes the metadata in the category entity.. let me try one thing
if I exclude the entire Category entity in the DTO I still have the same error. The query doesn't load any field from it, the DTO explicitly excludes it
the definition is quite complex
DTO is
read_many_config = DTOConfig(max_nested_depth=2,experimental_codegen_backend=False,
rename_fields={
"taglist": "tags",
"function.aliaslist": "aliases",
"function.portlist": "ports",
"function.autotaglist": "autotags",
"function.calllist": "calls",
},
exclude=[
"created_at","updated_at","ioslist",
"ios", "function_id","category_id",
"function.ports","function.autotags","function.aliases","function.calls","function.published","function.enabled","function.available","function.timeout_seconds","function.definition",
"function.portlist","function.autotaglist","function.aliaslist","function.calllist","function.taglist","function.created_at","function.updated_at",
"function.type","function.synchronous","function.id"
#"category.calls","category.calllist","category.taglist","category.created_at","category.updated_at",
"category",
"callback.call", "callback.secret", "callback.call_id", "callback.created_at","callback.updated_at", "callback.id",
],partial=True)
read_many = SQLAlchemyDTO[Annotated[Call, read_many_config]]
yikes. at this point wouldn't it be easier to just make a Struct?
and skip the DTO
I know, but I want to stick with DB models
otherwise maintance creeps up
anyway, the list method looks like this
I would start by making sure the type hints are correct on the models
@get(return_dto=DTOS.read_many)
async def get_calls(self, service: Service,
request: Request,
tag: str | None,
functionname: str | None,
function_id: UUID | None,
admin: str | None,
details: bool = False) -> list[Call]:
"""Get a list of calls."""
admintagnames = [tag for tag in request.user.tags]
basestmt = select(Call)
if function_id:
basestmt = basestmt.join(Function).where(Function.id == function_id)
elif functionname:
basestmt = basestmt.join(Function).where(Function.name == functionname)
else:
pass
if tag:
subq_call = aliased(Call)
subq_call_tag_association = aliased(call_tag_association_table)
subq_tag = aliased(Tag)
# Subquery
subquery = select(subq_call.id).join(subq_call_tag_association).join(subq_tag).where(subq_tag.name.in_(admintagnames))
# Main query
stmt = basestmt.join(call_tag_association_table).join(Tag).where(and_(
Call.id.in_(subquery),
Tag.name == tag
))
else:
stmt = basestmt.join(call_tag_association_table).join(Tag).where(Tag.name.in_(admintagnames))
if admin:
stmt = stmt.where(Call.admin == admin)
for qpname, qpvalue in request.query_params.items():
lqpname = qpname.lower()
if hasattr(Call, lqpname):
stmt = stmt.where(getattr(Call,lqpname) == qpvalue)
#loadopts=[noload("*"),Call.tags,Call.function]
if it's optional, make sure you have it on the type hint as well
if details:
loadopts = [
Call.tags,
joinedload(Call.function),
#joinedload(Call.category),
joinedload(Call.backend_callback),
joinedload(Call.frontend_callback),
joinedload(Call.ios).joinedload(IO.dataset)
]
else:
loadopts = [
noload("*"),
Call.function,
#Call.category,
Call.tags
]
call_list = await service.list(statement=stmt,load=loadopts, uniquify=True)
return call_list
how?
ah you mentioned it above, sorry
same result. I'm really starting to think it's not the category
hence my request on how to debug the DTO mappings to see where it fails
i would start excluding relationships from the dto totally until you see which ones (or if) it starts working
I don't think there's anyhting more than the stack trace and sleuthing re: debugging
but if there's something we can do to help in that re: i'm all ears
well it would be useful to know which entity that "name" refers to. I have three related entities with "name" in them, one being category, but the other 2 used to work
anyway I'll dig more tomorrow, thanks for the support!
it was this
read_many_config = DTOConfig(max_nested_depth=2,experimental_codegen_backend=False,
I have no idea why I set it, it's the only DTO where I set it, possibly a remnant of some past debugging. Must have stopped working when I updated Litestar a month ago to an up to date version
we should figure out what's not working in the litestar repo re: the code gen here
because it's not really expirimental any more. i think it's the default
but the codegen works. if you disable it then it doesn't work
I think I disabled it months ago because I was following the flow with the debugger trying to understand why my DTO wasn't doing what I wanted.
oh you are saying that removing that flag made it work?
yes
Has anyone had any issues with migration not applying?
I've got a case where alembic is saying it' is applying a migration but silently failing
I will test it in sometime
I'm not sure i have a reproducer from fullstack
Does it actually update the version in the alembic version table or just never apply that eitehr?
do you have an issue link or something to repro? wasn't seeing anything like this on alembic (nor on any other migration tools, they always fail very loud)
I tracked it down. The object store loses the alias you give it when generating migrations. The error it generates was suppressed by my logging config.
Ohh that's why I was not able to reproduce this error. I manually added backend arg in my object storage
Yep.
btw...
I was wondering how do you do DB migrations with alembic for libraries? I.e. create a plugin and publish this as a standalone package. I know this is not a litestar issue per se, but even for other sqlalchemy / alembic projects I didn't find a good or supportable solution for this. I also checked the source of the litestar-users project , because it brings DB models with it, but didn't find a solution for this. (And yes it sounds like the Django model but still this issue is something I am trying to solve for external packaged libraries)
I've got a pattern i've been using that works*
There's a few caveats though
so, in Adv Alchemy, we ship a default metadata registry (and registry factory). This is auto-used when you use the bases in AA (UUID/BigInt/AuditBase...)
in the third party library, you will want to use one of the built in registries (or cache one with the factory). Also, in the library, you need to make sure the model is imported into the user's modules. So, you can attach the model in __all__ or something like that to ensure it gets picked up.
AA then injects the same registries by default into alembic with it's configurations
Ok but this only works when u don't ship already created migration files in this external package, and you rely on an "make-migrations" run on the global app. What about shipping already created migration files, to have some order
This is fair. I think this fails into the * part of my statement.
I think we need to try to find a way to include version_locations in the AA alembic config https://alembic.sqlalchemy.org/en/latest/branches.html#multiple-bases
And then have a way for plugins to reference it somehow and append to it
This sounds like a plan 🙂 just read over it and i will take a deeper look into the doc the upcoming week
Is this something you are still interested in finishing? I worked with claude to help create a plan that would let us implement it gradually, if this is something you still want. Feel free to tweak it as needed, but I think it's not too far off the mark of something that's acceptable
I’m using Litestar and currently transitioning to Advanced Alchemy. I have an Advanced Alchemy service for working with entities (the structure is roughly similar to litestar-fullstack). Additionally, my backend implements fairly complex logic with reusable functions that require database access. How can I obtain the current database session in these functions? The function may be called from either a controller or a job. I’d prefer not to pass the service from the controller through all the functions down to the one that needs it. Currently, I’m creating a new session each time using async with alchemy.get_session() as session, which I believe is incorrect if a session is already created by the controller.
I’m responding from my phone so I can’t easily link line numbers. https://github.com/litestar-org/litestar-fullstack/blob/main/src/app/domain/accounts/guards.py
look at the provide_session in the guard. This provides the same session that was provided from the route.
@marsh jacinth Thank you for the response! I looked at the alchemy.provide_session(connection.app.state, connection.scope) function, but how do I obtain the connection.app.state and connection.scope arguments inside my helper function? To simplify, my function retrieves data from the database and transforms it into a specific format (performs analysis and returns the result). This small function is used everywhere. (Previously, I used MongoDB with a global database client object, so I had no issue accessing data.) Now, while transitioning to Advanced Alchemy, it’s unclear how to obtain the current session if it exists in the context (e.g., when the call comes from a controller).
Connection is a DI object available on any route automatically.
But there’s likely an even easier method. Can you share some code samples so I can guide you a bit better?
Here's a simplified example, but the approach is roughly like this.
get_prompt_template is an example of such a helper function that can be used in different parts of the application.
This is a working example, but it bothers me that I have to constantly create a new session to retrieve data, which may not be entirely correct. In some more complex cases, where both updating and reading data are needed within a single request, all operations should be performed within the same db_session.
If you are invoking from a route, you can just do this and they will all use the same db session. https://github.com/litestar-org/litestar-fullstack/blob/main/src/app/domain/accounts/controllers/user_role.py
Otherwise you can use the .new context manager on the service. And pass in the same session object on each service instantiation.
The code structure is such that in some cases, this function will be at the 4th or 5th level of nesting from the controller. For example, the controller analyse_news_intention([sources]) is called, and depending on the provided sources, the retrieve_from_source function is executed. After that, a pipeline for extracting and cleaning the code is triggered. Then, based on factors like text length or whether it’s scientific text, an LLM model is selected. After selecting the model, a service is called to choose the appropriate set of prompts (and here I need to access the database to select suitable models and prompts). This function will be called quite frequently. How can I access the database in this function using the existing session?
@marsh jacinth I have a question and would greatly appreciate your guidance. Could you kindly advise on the best way to access information about the current database session within a helper function that might be deeply nested from the controller, without needing to pass additional parameters down from the controller through all the functions?
I'm not sure what you are looking for that's not already there. you can reference the session on any repostiory class that's created. You can use the same session without using DI at all with something like this:
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import alchemy
from app.services import RoleService, TeamService, UserService
@asynccontextmanager
async def maybe_get_session(db_session: AsyncSession | None) -> AsyncGenerator[AsyncSession, None]:
if db_session is None:
async with alchemy.get_session() as session:
yield session
else:
yield db_session
async def provide_all_the_services(db_session: AsyncSession | None = None) -> tuple[UserService, TeamService, RoleService]:
async with maybe_get_session(db_session) as session, UserService.new(session) as user_service, TeamService.new(session) as team_service, RoleService.new(session) as role_service:
return user_service, team_service, role_service
and call it like:
user_service, team_service, role_service = await provide_all_the_services()
user_service.repository.session == team_service.repository.session == role_service.repository.session
Thank you for the response! I was just thinking that since we already have the advanced-alchemy plugin integrated into Litestar, it would be great to have a function like get_db_session() that would return the current session or create a new one without needing to pass parameters, as in the context of a controller call, we should already have a database session.
I think we are just miscommunicating slgihtly on this. what you are asking for does exists
when you need a session outside of the scope of a request you use config.get_session. It will get a new session
when you need to get the session from the request scope, you use config.provide_session
db_session is automatically injected into each route for you
so, you just need to add it as a parameter and use it
Config.provide_session requires mandatory parameters, but it would be preferable to have the ability to simply obtain a session without explicitly passing them.
how do you propose something like that would work?
Retrieve the necessary information from the context, if possible.
Which context?
also, which request? if you have 50 active rquests, you might have 50 different sessions querying
how do you get the sessions scoped to the current request?
Thank you so much for your help! It seems I need to dive a bit deeper into how Litestar works before asking more questions.
Is there a way to do a partial update of an object with SQLAlchemyAsyncRepositoryService? An unmodified instance seems to be using INSERT with all the fields using the update() method.
Yes. Check out the fullstack repository. The user and team edits support patch
Hmm, either I'm missing something very obvious, or maybe the mapping to sqlite specifically has a bug?
class JobStatusUpdate(Struct):
status: JobStatus
def to_dict(self) -> dict[str, Any]:
return {
f: getattr(self, f)
for f in self.__struct_fields__
if getattr(self, f, None) != UNSET
}
class JobStatus(StrEnum):
ACCEPTED = auto()
TRANFORMED = auto()
COMPLETE = auto()
class Job(UUIDAuditBase):
completed_at: Mapped[datetime | None] = mapped_column(default=None)
status: Mapped[JobStatus] = mapped_column(default=JobStatus.ACCEPTED)
template: Mapped[JobTemplate]
transaction_id: Mapped[str] = mapped_column(unique=True, index=True)
class JobRepository(SQLAlchemyAsyncRepository[Job]):
"""Job repository."""
model_type = Job
class JobService(SQLAlchemyAsyncRepositoryService[Job]):
"""Job service."""
repository_type = JobRepository
class JobController(Controller):
path = "/jobs"
dependencies = create_service_dependencies(JobService, key="job_service")
@patch("/{transaction_id:str}/status")
async def update_job_status(
self, transaction_id: str, data: JobStatusUpdate, job_service: JobService
) -> Job:
return await job_service.update(
item_id=transaction_id,
id_attribute="transaction_id",
data=data.to_dict(),
)
IntegrityError: (sqlite3.IntegrityError) NOT NULL constraint failed:
job.template
[SQL: INSERT INTO job (completed_at, status, template,
transaction_id, id, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)]
[parameters: (None, 'TRANSFORMED', None, 'foo-bar', <memory at
0x1131fb700>, '2025-07-31 23:08:42.546409', '2025-07-31 23:08:42.546416')]
taking a look now, but one thing i see that can improve your type hinting is to add the repository into the service generic:
class JobRepository(SQLAlchemyAsyncRepository[Job]):
"""Job repository."""
model_type = Job
class JobService(SQLAlchemyAsyncRepositoryService[Job, JobRepository]):
"""Job service."""
repository_type = JobRepository
Also, I see that you are overriding the id_attribute in this case which is a bit non standard. There very well could be a bug, and I'll try to recreate this. But let me offer an alternative that is more likely to be closer to what you want anyway. (gimme a few...)
Something like ths for the route:
@patch("/{transaction_id:str}/status")
async def update_job_status(
self, transaction_id: str, data: JobStatusUpdate, job_service: JobService
) -> Job:
data_to_update = data.to_dict()
data_to_update["transaction_id"] = transaction_id
return await job_service.get_and_update(
match_fields=["transaction_id"],
**data_to_update,
)
I do think there was a bug specific to id_attribute and update. This PR should resolve the issue: https://github.com/litestar-org/advanced-alchemy/pull/502
Correctly merge attributes onto existing instance when using id_attribute and update
area/private-api, area/repositories, pr/internal, size: small, type/bug
Also, I see that you are overriding the id_attribute in this case which is a bit non standard.
Bridging legacy systems is fun (they insist on using their own token, but won't make guarantees about it beyond "being unique", but it looks like a UUID with a prefix to me).
Something like ths for the route:
get_and_update()of course i missed that one. better than my temporary workaround too:
@patch("/{transaction_id:str}/status")
async def update_job_status(
self, transaction_id: str, status: JobStatus, job_service: JobService
) -> Job:
job = await job_service.get(
item_id=transaction_id, id_attribute="transaction_id"
)
job.status = status
return await job_service.update(
item_id=transaction_id,
id_attribute="transaction_id",
data=job,
)
if you are able to test that branc and confirm your original code works that would be super helpful
if not, i think i've fixed the original code you had and you can verify on next release
If you have a shortcut for working off that branch in a project where it's pip installed, I'd be happy to.
are you using pip/requirements, pdm, uv?
uv
something like this: uv add "advanced-alchemy[<your extras>] @ git+https://github.com/litestar-org/advanced-alchemy.git@id-attribute-update"
I was just digging around over there: https://docs.astral.sh/uv/concepts/projects/dependencies/#git
uv is an extremely fast Python package and project manager, written in Rust.
There's a different traceback now from the change, but i have to trim it for discord, 1 sec.
2025-08-01T15:35:43.205723Z [error ] Uncaught exception connection_type=http path=/jobs/foo-foo/status
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /Users/max/src/api/.venv/lib/python3.13/site-packages/advanced_al │
│ chemy/exceptions.py:286 in wrap_sqlalchemy_exception
│ 283 │
│ 284 │ """
│ 285 │ try:
│ ❱ 286 │ │ yield
│ 287 │
│ 288 │ except NotFoundError as exc:
│ 289 │ │ if wrap_exceptions is False:
│ /Users/max/src/api/.venv/lib/python3.13/site-packages/advanced_al │
│ chemy/repository/_async.py:1418 in update
│ 1415 │ │ │ │ │ if new_field_value is not MISSING:
│ 1416 │ │ │ │ │ │ existing_field_value = getattr(existing_insta │
│ 1417 │ │ │ │ │ │ if existing_field_value is not MISSING and ex │
│ ❱ 1418 │ │ │ │ │ │ │ setattr(existing_instance, field_name, ne │
│ 1419 │ │ │ instance = await self._attach_to_session(existing_instanc │
│ 1420 │ │ │ await self._flush_or_commit(auto_commit=auto_commit) │
│ 1421 │ │ │ await self._refresh(
... 4 frames hidden ...
│ /Users/max/src/api/.venv/lib/python3.13/site-packages/sqlalchemy/ │
│ orm/util.py:307 in set_
│ 305 │ │ def set_(state, value, oldvalue, initiator):
│ 306 │ │ │ if include_backrefs or not detect_is_backref(state, initi │
│ ❱ 307 │ │ │ │ return validator(state.obj(), key, value)
│ 308 │ │ │ else:
│ 309 │ │ │ │ return value
│ /Users/max/src/api/.venv/lib/python3.13/site-packages/advanced_al │
│ chemy/mixins/audit.py:26 in validate_tz_info
│ 24 │ @validates("created_at", "updated_at")
│ 25 │ def validate_tz_info(self, _: str, value: datetime.datetime) -> dat │
│ ❱ 26 │ │ if value.tzinfo is None:
│ 27 │ │ │ value = value.replace(tzinfo=datetime.timezone.utc)
│ 28 │ │ return value
╰─────────
AttributeError: 'NoneType' object has no attribute 'tzinfo'
strange. i see that in the test failures as well. this looks unrelated, i think
If you look at the first traceback, the UUIDAuditBase fields are present in the INSERT command, so it looks like it's being populated somewhere. I'm probably telling you what you already know though 😄
@marsh jacinth It works with the new changes 🙂 advanced-alchemy==1.4.5 (from git+https://github.com/litestar-org/advanced-alchemy@03bcb266ae006357c448e1883dd0faffce3aad7d)
Does it support mutiple db connection.
Yep
There are some examples I think. You just pass multiple configs in
I’ll grab one one I get back to my laptop if you can’t find it
I guess it's not documented =/.
it's something like this:
config1 = SQLAlchemyAsyncConfig(connection_string="sqlite+aiosqlite://")
config2 = SQLAlchemyAsyncConfig(
connection_string="sqlite+aiosqlite://",
session_dependency_key="other_session",
session_scope_key="_sqlalchemy_state_2",
engine_dependency_key="other_engine",
)
plugin = SQLAlchemyPlugin(config=[config1, config2])
you can then use one dependency at db_session and the other at other_session on any route.
Also, there's a long standing enhancement I've wanted to add in that makes the service aware of this type of setup. You'll have to add that functionality with a wrapper around the service for now.
this should work for every backend or standalone
If you'd also like to receive a bookmark to the linked message, click the 📌 below!
to the advanced_alchemy folks here, so using a Service with default_load_options, how can I do something like this example here session.query(User).options( subqueryload(User.addresses).load_only(Address.email_address) . let's say in other words: I need to query from the relation only entries with a condition (i.e. is_deleted != false or think of any other condition). How would I do that inside the default_load_options, or is there any other way from the ORM Model to do this?
get_one_or_none(.., load=[subqueryload(...).load_only(...)]
or if you don't need something manipulated on the joined model, you can use the simple shorthand here: https://github.com/litestar-org/litestar-fullstack/blob/main/src/app/domain/teams/controllers/teams.py#L33
if you do the shorthand, it looks at the model relationship. it does a subqquery load for anything that's a list relationship
and joined for those that are not
Not sure if I would call it a bug but I have an issue with the silent fallback of nanoid to uuid without even a warning when fastnanoid is not installed in NanoIDPrimaryKey. I would prefer a clean raised error. What is the advantage here? A silent fallback? Is that what you want? I just noticed it because of SQL errors because sqlite does not support uuid types.
What if the package is missing later... would it try a migration? Will the fallback even work with relationships? those would be the wrong type anyways?
it was originally only done for the uuid7. it was mainly to limit the amount of conditional installation blocks we had.
it's not the wrong type to the database since it's just a raw (or maybe a string for nanoid).
we can look at adding a warning in these cases though
the nanoid implementation just mirrored the uuid type when it was added
I see... a warning sounds good .)
I have an issue updating to advanced_alchemy 1.5.
I'm using a model with column_property. When calling update the function column_has_defaults from here (https://github.com/litestar-org/advanced-alchemy/blob/25a37b193a66644548c1a1996bf9a72a365d3ff9/advanced_alchemy/repository/_util.py#L344) is called which fails, as the column_property creates no attribute default.
I was not able to create an failing test in advanced-alchemy yet, but maybe you get already why this fails.
If i check if column is an sqlalchemy.Label manually and return False my code works again, but I don't know if this is an valid solution.
Can you confirm if the issues persists when running from the main branch? I just merged 2 fixes for update and update_many this morning that haven't been released yet.
error is still there:
│ /home/franz/Workspaces/python/romecat-backend/.venv/lib/python3.13/site-pack │
│ ages/advanced_alchemy/repository/_util.py:360 in column_has_defaults │
│ │
│ 357 │ │ bool: True if the column has any type of default or update han │
│ 358 │ """ │
│ 359 │ return ( │
│ ❱ 360 │ │ column.default is not None │
│ 361 │ │ or column.server_default is not None │
│ 362 │ │ or column.onupdate is not None │
│ 363 │ │ or column.server_onupdate is not None │
│ │
│ /home/franz/Workspaces/python/romecat-backend/.venv/lib/python3.13/site-pack │
│ ages/sqlalchemy/sql/elements.py:1508 in __getattr__ │
│ │
│ 1505 │ │ try: │
│ 1506 │ │ │ return getattr(self.comparator, key) │
│ 1507 │ │ except AttributeError as err: │
│ ❱ 1508 │ │ │ raise AttributeError( │
│ 1509 │ │ │ │ "Neither %r object nor %r object has an attribute %r" │
│ 1510 │ │ │ │ % ( │
│ 1511 │ │ │ │ │ type(self).__name__, │
╰──────────────────────────────────────────────────────────────────────────────╯
AttributeError: Neither 'Label' object nor 'Comparator' object has an attribute
'default'
thanks. we'll fix this before doing the next release
The column in question is a column_property. In this case column is of type <class 'sqlalchemy.sql.elements.Label'>.
The column from the model:
product_name: Mapped[str] = column_property(
sa.select(Product.product_name).where(Product.id == sa.literal_column("lot.product_id")).scalar_subquery()
)
Thanks a lot 🙂
Does this address your issue? https://github.com/litestar-org/advanced-alchemy/pull/539
Property correction for updates
area/private-api, area/repositories, pr/internal, size: small, type/bug
Yes, I can confirm this works 🙂
Thanks
This test is failing now as well https://github.com/litestar-org/litestar-fullstack/blob/960d63426151f41d0528548fec48bf7c6cc43d6a/tests/integration/test_teams.py#L92 but this is an different issue:
IntegrityError: (raised as a result of Query-invoked autoflush; consider using a
session.no_autoflush block if this flush is occurring prematurely)
(sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class
'asyncpg.exceptions.NotNullViolationError'>: null value in column "team_id" of
relation "team_member" violates not-null constraint
DETAIL: Failing row contains (5ef29f3c-3560-4d15-ba6b-a2e5c721e4d2, null,
MEMBER, f, 01991059-91e5-7cc0-bf08-af125facfd65, null, 2025-09-03
16:12:09.573982+00, 2025-09-03 16:12:09.573984+00, null).
[SQL: INSERT INTO team_member (user_id, team_id, role, is_owner, id, created_at,
updated_at, deleted_at) VALUES ($1::UUID, $2::UUID, $3::VARCHAR, $4::BOOLEAN,
$5::UUID, $6::TIMESTAMP WITH TIME ZONE, $7::TIMESTAMP WITH TIME ZONE,
$8::TIMESTAMP WITH TIME ZONE)]
[parameters: ('5ef29f3c-3560-4d15-ba6b-a2e5c721e4d2', None, <TeamRoles.MEMBER:
'MEMBER'>, False, '01991059-91e5-7cc0-bf08-af125facfd65',
datetime.datetime(2025, 9, 3, 16, 12, 9, 573982, tzinfo=datetime.timezone.utc),
datetime.datetime(2025, 9, 3, 16, 12, 9, 573984, tzinfo=datetime.timezone.utc),
None)]
(Background on this error at: https://sqlalche.me/e/20/gkpj)
The above exception was the direct cause of the following exception:
...
ok, who knew one change to update would cascade like this. There's so many edge cases
but this test failed before https://github.com/litestar-org/advanced-alchemy/pull/539 already, just tested with the version before. But something between 1.4.5 and now as well.
Property correction for updates
area/private-api, area/repositories, pr/internal, size: small, type/bug
This will also be corrected before we trigger the next release
Hey all, wondering if you've put any thought into a slightly awkward handoff with a nullable FileOjbect between underlying sql json 'null' string value and None values in python. If someone doesn't investigate the underlying sql representation, they're likely to stumble if trying to filter null values, e.g.
class Job(UUIDAuditBase):
content: Mapped[FileObject | None] = mapped_column(
StoredObject(backend=storage_backend), info=dto_field("private")
)
completed_at: Mapped[datetime | None] = mapped_column(default=None)
async def job_sweeper() -> None:
"""Removes content and output for jobs created more than 30 days ago"""
async with sqlalchemy_config.get_session() as db_session:
job_provider = create_service_provider(JobService)
job_service = await anext(job_provider(db_session))
old_jobs = await job_service.list(
Job.created_at < (datetime.now(UTC) - timedelta(days=30)),
# doesn't work because it generates IS NOT NULL and the value is 'null'
# Job.content.is_not(None),
Job.content != text("'null'"),
)
for job in old_jobs:
job.content = None # -> 'null'
job.output = None # -> 'null'
await job_service.update_many(old_jobs, auto_commit=True)
One possible way around it might be setting none_as_null=True when initializing the underlying class, but I don't know how deep that change would be: https://github.com/litestar-org/advanced-alchemy/blob/11f5b683f1122328d207d7469f286860412cd7ad/advanced_alchemy/types/json.py#L78
I realise that there was a regression between AA 1.5.0 and 1.6.2 to do with mapped attributes. Coming out the other side of it though, seems there's a new bug that wasn't around in 1.4.5 to do with Numpy mapped types (i.e. pgvector's Vector type):
File ".../.venv/lib/python3.13/site-packages/advanced_alchemy/service/_async.py", line 775, in update
await self.repository.update(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...<11 lines>...
),
^
File ".../.venv/lib/python3.13/site-packages/advanced_alchemy/repository/_async.py", line 1505, in update
if existing_field_value is not MISSING and existing_field_value != new_field_value:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()
I can't see any special handling previously, but the same dependencies with only AA 1.4.5 doesn't throw the same error.
I can try get an MCVE whipped up for this tomorrow if helpful Cofin, but you always seem to know where these issues are coming from before I so much as have time to explain them further
I think i got it from here actually.
Truly a magician
If i don’t have anything fixed by tomorrow, then an mcve would be good
sounds good, I'll keep an eye out
Sorry a little late on the reply. saw it yesterday but bit of a busy one. Looks like it solves the issue how Numpy want the arrays compared but I'll test it this afternoon if I can find some time
@marsh jacinth looks like it works a charm
at the very least, it's not failing any of my tests it was failing on earlier, and I threw a few extra tests in that also passed
Cool. I’ll add bit of coverage to make the code quality checks happy
I'm seeing an issue where the has_dict_attribute function (/service/typing.py:254) always returns True.
Minimal example:
from advanced_alchemy.service.typing import has_dict_attribute
has_dict_attribute(1) # True
has_dict_attribute([1, 2, 3]) # True
This is with advanced-alchemy 1.6.3 and python 3.12.11 on Linux. Also reproduced on python 3.13.7.
Is it worthwhile to instead do a hasattr check? That might also be faster, I believe.
Hey @marsh jacinth, we started to get an error after version 1.5. (tried on 1.6.3)
I think the problem is new merge algorithm.
https://github.com/litestar-org/advanced-alchemy/blob/725e26a4324f9f2b297109a16759e59372084693/advanced_alchemy/repository/_async.py#L1525
The data that solved from dtobackend sends related data as transient not persistent and data in transient state don't support merge.
I have tried to create a mre with just AA but it creates data as persistent state
sqlalchemy.exc.InvalidRequestError: merge() with load=False option does not support objects transient (i.e. unpersisted) objects. flush() all changes on mapped instances before merging with load=False.
I have prepared an MRE:
https://gist.github.com/hasansezertasan/c2d682078cf678b2fdb399b4d1c1be98
Thanks 🙂
Ok, we'll get this fixed. this has been a pain to address.
I'll take a look this afternoon
I'm experiencing different behaviour unrelated to relationships, but with the update method as well
https://github.com/litestar-org/advanced-alchemy/issues/560
Description
Release v1.5.0 made breaking changes to the repository update method.
See the self contained script below using v1.4.5 (which passes)
The bug is causing failures as seen here:
https://github.com/mvbosch/litestar-users/issues/98
URL to code causing the issue
No response
MCVE
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "advanced-alchemy==1.4.5",
# ]
# ///
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from advanced_alchemy.repository import SQLAlchemySyncRepository
from advanced_alchemy.base import UUIDBase
class User(UUIDBase):
"""Simple User model for testing."""
__tablename__ = "users"
name: str = Column(String, nullable=False)
age: int = Column(Integer, nullable=True)
class UserRepository(SQLAlchemySyncRepository[User]):
"""Repository for User model."""
model_type = User
def main() -> None:
engine = create_engine("sqlite:///:memory:", echo=False)
User.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
try:
repo = UserRepository(session=session)
created_user = repo.add(User(name="John Doe", age=30))
session.commit()
repo.update(User(id=created_user.id, age=25))
session.commit()
fetched_user = repo.get(created_user.id)
assert fetched_user.name == "John Doe", f"Expected name 'John Doe', got '{fetched_user.name}'"
assert fetched_user.age == 25, f"Expected age 25, got {fetched_user.age}"
assert fetched_user.id == created_user.id, "User ID should remain the same"
print(f"User after update: id={fetched_user.id}, name={fetched_user.name}, age={fetched_user.age}")
except Exception as e:
print(f"\nError occurred: {e}")
raise
finally:
session.close()
if __name__ == "__main__":
main()
Steps to reproduce
Change the MCVE dependency to "advanced-alchemy==1.5.0" and execute the script.
Screenshots
N/A
Logs
Error occurred: There was a data validation error during processing
Traceback (most recent call last):
File "/home/developer/.cache/uv/environments-v2/aa-v-1-5-0-6ef1f15e67cc0d8b/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
self.dialect.do_execute(
File "/home/developer/.cache/uv/environments-v2/aa-v-1-5-0-6ef1f15e67cc0d8b/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 951, in do_execute
cursor.execute(statement, parameters)
sqlite3.IntegrityError: NOT NULL constraint failed: users.name
The above exception was the direct cause ...
bug
https://github.com/litestar-org/advanced-alchemy/pull/563
can you give this a try?
Problem
In v1.5.0, the update() method was refactored to manually copy field
values from the input data to the existing instance. However, when a
user creates a partial update like User(id=1, age=25), SQLAlchemy
initializes ALL column attributes (including 'name') to None. The old
code couldn't distinguish between "explicitly set to None" vs
"uninitialized and defaults to None", causing it to overwrite existing
valid data with None and trigger NOT NULL constraint failures.
Solution
Added was_attribute_set() helper function that uses SQLAlchemy's
instance state inspection to check which attributes were actually
modified/set on the input instance. The update() method now only copies
attributes that have been explicitly set by the user.
Changes
- Added
was_attribute_set()function to _util.py that checks:- Attribute history (has_changes()) for modified attributes
- Instance
__dict__for attributes set during init
- Updated both async and sync repository update() methods to check if
attributes were explicitly set before copying them
Closes
Fixes #560
area/private-api, area/repositories, pr/internal, size: small, type/bug
What does everyone want in a v2 AA? I think we’ll want to consider a major bump when removing 3.9 support.
I suggest to add a pinned issue on GitHub for that and maybe add at the top of the README a look to it (both for visibility as I am not sure all the AA users are on Discord)
is this something you want to do? I think we need to start considering what we want to do. maybe there will be no major breaking changes here
I can do later today (going through a lot of meetings in few minutes ^^)
no rush. thanks
Hi cofin, thanks for this. I've tested it on the provided example and it looks good
I haven't tried on a proper project, but I'm sure it will be fine.
Okay, adding that dependency to an existing project and running integration tests was easier than I thought.
That patch is all good 🙂
Hello everyone, we are planning the 2.0 release and would like your feedback.
Litestar database commands seem a bit limited and the env.py I copy from litestar-fullstack seem to not be able to handle direct alembic commands
For instance
the litestar database stamp does not have a --purge option
How am I supposed to handle this?
we need to fix it
alembic doesn't use click (at least it didn't use to) so we have to re-implement the commands
i probably just missed it
Is there already an open issue for it?
I've been refining an AI workflow for these kinds of bugs and it's probably an easy fix for it (famous last words)
Lol, I think I have seen it work reasonably well as long as the repos follow best practices and are written well with good typing etc. see pydantic-ai use claude in their repo for example.
Also no issue yet, i wanted to confirm if this is not intentional and there is not an escape hatch to alembic I was missing or something
no, i likely just missed this
i don't often use stamp, so there's probably something there
Description
This is incomplete/incorrect
The alembic stamp command has more options
https://alembic.sqlalchemy.org/en/latest/api/commands.html#alembic.command.stamp
Steps to reproduce
1. uv run litestar database stamp --help
Package Version
v1.6.3
Platform
- ✅ Linux
- ◻️ Mac
- ◻️ Windows
- ◻️ Other (Please specify in the description above)
bug
@marsh jacinth I am following the litestar-fullstack example and copied over the migrations folder but it already has a pre generated revision (I removed it since I have different models)
I am finding that on a fresh db
Doing
uv run litestar database make-migrations
Results in just the alembic tables being created and a message
The generation of a migration file is being skipped because it would result in an empty file.
Am I missing something?
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = orm_registry.metadata
How do I get autogenerate support?
How does this actually work? How does it know where to look for my models just based on the orm_registry.metadata?
that's correct. we may need to make sure fullstack is using the latest templates. it's on the list to be updated soon. you also have to have your models imported at the time alembic runs. so you can typically do this by adding them into the init and ensuring that the env.py is importing it.
let me find the links. first all models are exposed in src/app/db/models/init.py
then in the on_app_init of the core, i use from app.db import models as m
so by the time alembic runs, it' already in the pythonpath/imported modules
this is in here src/app/server/core.py
Yeah I saw that but I guess I don't understand on_cli_init vs on_app_init
I thought the on_app_init only runs when you actually run the server.
Only on_cli_init runs for the cli commands. Am I mistaken? If so then why do you need both?
let me go refresh my memory before i say more. But the gist is that you end up setting the metadata with the config or using the default one, when you import the model it gets added into the registry that SQLAlchemy is tracking. since we invoke the alembic command through the CLI plugin, we can ensure we have imported the model. I had the model also in the on app init for the signature namespace addition, so it was for an unrelated reason.
Well it worked when I imported it to on_app_init. Didn't realize on_app_init ran even when you don't actually start the server. It does make sense to have it accessible for the cli operations.
I also added to the signature namespace {'m': m}. Don't really know why 😅
But since I am importing it might as well use it for something
you'll get error on occassion with a forward reference if you have import if TYPECHECKING block. adding them to the signature namespace allows you to keep them in the typing block but still be able to resolve at runtime.
i just imported the entire models submodule and added that so i didn't ahve to do every potential model
and then in the code you'll see me just import from app.db import models as m and then something like m.User
i so something similar for schemas
so it's s.User
Also @marsh jacinth what do the following in the script.py.mako do?
sa.GUID = GUID
sa.DateTimeUTC = DateTimeUTC
sa.ORA_JSONB = ORA_JSONB
sa.EncryptedString = EncryptedString
sa.EncryptedText = EncryptedText
I get type checking issues in revisions because these get auto added to my migration scripts. I am starting to think I want to not type check migration files, but these are the only issues I see
It's all attribute access issue about sqlalchemy not having any GUID etc.
I exclude this from my type checking. but these blocks specifically ensure elembic doesn't have issues using the custom types we have when generating migrations.
Opened a tiny PR for this:
https://github.com/litestar-org/advanced-alchemy/pull/579
Description
The previous implementation used isinstance against the DictProtocol type returned True for any object.
For example:
from advanced_alchemy.service.typing import has_dict_attribute
has_dict_attribute(1) # True
has_dict_attribute([1, 2, 3]) # True
The isinstance call is replaced with hasattr (which should also be faster). With this change, I believe the DictProtocol class could also be removed, but I kept the changes to a minimum.
Triage Required :hospital:, area/services, pr/external, size: small, type/bug
@marsh jacinth is there any concern i create a PR for column ordering? i saw this feature removed due to bugs
https://github.com/litestar-org/advanced-alchemy/pull/434/files
sqlalchemy support this since 2.0, tested and it works on postgres
Removes column re-ordering component was incorrectly causing incorrect constraints to be genreated.
Fixes #427
area/alembic, pr/internal, size: small, type/bug
nvm, going to do it anyway 
let me know if you have any concern, or prefer not to have magic numbers
do you know if this will trigger alter statements for an existing app using these?
No issues adding this back if it's working
IIRC, it was causing strange issues with some of the other (non postgres) adapters. that may be resolved now though.
it wont unless they set to recreate table
we no longer need to take care of this right? Engine config automatically seems o fix this by ensuring serializer returns a str
this avoids the extra encode/decode cycle
ooo, yeah i am dumb
Hey @marsh jacinth re: the latest AA update (I think the property changes you made)
Docs say that protected properties (denoted with leading _) are ignored. Looks like they're still evaluated for forward references though, so you can't have private properties that reference type-checking imported only types like you can for e.g. relationships. Is that expected behaviour?
There's also some changed behaviour around using builtins in hybrid_properties (i.e. in the Python definition as opposed to the DBMS side expression). Easy for me, I just converted those over to properties but could see that being problematic in some edge cases. Started on an MCVE but want to confirm whether you'd expect something like this to be valid:
@hybrid_property
def total_cost(self) -> float:
total = 0.0
for row in self.rows:
total += row.unit_cost * row.count * (1 - row.discount_rate)
return round(total, 2)
@total_cost.inplace.expression
def _total_cost_expression(cls) -> ColumnElement[float]: # noqa: N805
from .order_row import OrderRow
return func.round(
func.coalesce(
select(func.sum(OrderRow.unit_cost * OrderRow.count * (1 - OrderRow.discount_rate)))
.where(OrderRow.order_id == cls.id)
.correlate(cls)
.scalar_subquery(),
0.0,
),
2,
)
(very rough example, didn't have an inplace expression beforehand)
litestar database make-migrations didn't detect my models
This comes up a lot. You have the same constraints normal alembic does.
You need to make sure the model is imported before the alembic code executes runs.
We will fix that
use https://docs.advanced-alchemy.litestar.dev/latest/
for now
@marsh jacinth I noticed that when using advanced alchemy and msgspec, you cannot add type decoders like you do with litestar, since it's fixed to DEFAULT_TYPE_DECODERS:
dec_hook=partial(
_default_msgspec_deserializer,
type_decoders=DEFAULT_TYPE_DECODERS,
),
Shouldn't it include the type_decoders set in the app config?
I had to add this to the geoalchemy plugin shared [here](#general message) to make it work:
`# Patch advanced_alchemy's DEFAULT_TYPE_DECODERS to include geometry decoders
from advanced_alchemy.service._util import DEFAULT_TYPE_DECODERS
geometry_decoder_tuple = (geometry_predicate, geometry_decoder)
if geometry_decoder_tuple not in DEFAULT_TYPE_DECODERS:
DEFAULT_TYPE_DECODERS.append(geometry_decoder_tuple)`
Yes, this is an issue that we've avoided being reported till recently. Because adv alchemy supports more than just litestar, it was easier to build without this level of integration at the start. We need to improve this so you can pass in the kinds of settings and the litestar plugin can automatically inject the ones from the routes.
Would you mind opening an issue with a feature request to provide some way to customize the serializers?
enable_touch_updated_timestamp_listener: bool = True
"""Enable Created/Updated Timestamp event listener.
This is a listener that will update ``created_at`` and ``updated_at`` columns on record modification.
Disable if you plan to bring your own update mechanism for these columns"""
am i supposed to set it to false if i use on_update on the model?
updated_at = Column(DateTime, onupdate=datetime.utcnow)
the listener always forces a timezone aware timestamp
Yes - you may find cases where the listener was updating timestamps and the onupdate won't though.
huh do you have an example when that can happen?
Yeah, the first one that comes to mind is polymorphic relationships.
In those cases, your model is spread out of 2 tables (in one of the configs)
This was at least the edge case that came up when we orignally discused it - I'd have to verify if we even have a test case / script to demonstrate it at this point. it's been a few years.
Nice, our table columns are not TZ aware (I know, I know) we are not ready to do a migration yet.
I am just introducing Litestar to our stack, so I had to change our models to use DefaultBase
and
I had to replace func.now() with datetime.utcnow
Because I ran into
https://github.com/litestar-org/litestar/issues/3457#issuecomment-2169884392
And then I ran into this lol,
I would just make a new AuditBase with your changes you might need in this case. It's easy to extend the existing stuff so it all works correctly with advanced alchemy. Also, you may not need to disable the listener if you just changed the default here and don't use the DateTimeTZ custom type in Adv Alchemy.
Can you clarify the last part
Also, you may not need to disable the listener if you just changed the default here and don't use the DateTimeTZ custom type in Adv Alchemy.
I don't use the custom type I think
you do automatically if you are using the AuditMixin. which may be the thing you are trying to fix
from sqlalchemy.orm import Mapped, declarative_mixin, mapped_column, validates
from advanced_alchemy.types import DateTimeUTC
@declarative_mixin
class AuditColumns:
"""Created/Updated At Fields Mixin."""
created_at: Mapped[datetime.datetime] = mapped_column(
DateTimeUTC(timezone=True),
default=lambda: datetime.datetime.now(datetime.timezone.utc),
sort_order=3002,
)
"""Date/time of instance creation."""
updated_at: Mapped[datetime.datetime] = mapped_column(
DateTimeUTC(timezone=True),
default=lambda: datetime.datetime.now(datetime.timezone.utc),
onupdate=lambda: datetime.datetime.now(datetime.timezone.utc),
sort_order=3003,
)
"""Date/time of instance last update."""
@validates("created_at", "updated_at")
def validate_tz_info(self, _: str, value: datetime.datetime) -> datetime.datetime:
if value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
return value
class UUIDAuditBase(CommonTableAttributes, UUIDPrimaryKey, AuditColumns, AdvancedDeclarativeBase, AsyncAttrs):
__abstract__ = True
this is all you need to replace it. this is the current implementation
But I use the DefaultBase which doesn't seem to use the AuditMixin
Or is your suggestion to actually use a custom AuditBase and that way I don't have to disable the listener
well, they aren't coupled this tightly. as long as you have the updated_at column on your model, the event listener will see it.
but, I was suggesting if you didn't want to add these columns on every model, you can create a custom audit base and it will integrate with anything else in AA.
DefaultBase is fine too btw. It's just this:
class DefaultBase(CommonTableAttributes, AdvancedDeclarativeBase, AsyncAttrs):
__abstract__ = True
Yeah but that means it's still a problem to leave the listener on.
The listener does not seem to respect that
updated_at = Column(DateTime, onupdate=datetime.utcnow)
My column is naive
And it seems to run/take over the onupdate and causes an error since my DB cannot handle tzinfo
I verified that listener ran first by placing an event on the updated_at column
@event.listens_for(Workflow.updated_at, "set", retval=False)
def debug_updated_at_set(target, value, oldvalue, initiator):
if value is not None and value.tzinfo is not None:
print("TZ-AWARE updated_at detected:", value)
traceback.print_stack(limit=15)
return value
This was in the trace
File "/Users/vikash/Projects/Alexi/lexalr/.venv/lib/python3.12/site-packages/advanced_alchemy/_listeners.py", line 469, in touch_updated_timestamp
instance.updated_at = datetime.datetime.now(datetime.timezone.utc)
You can see that it's passing a utc aware timestamp. I think given that my DB column is not tz aware this line in the listener will always be a problem for me though right?
that would seem to indicate that. let me go look at that code to refresh my memory
yeah, you are correct
heres the listener:
def touch_updated_timestamp(session: "Session", *_: Any) -> None: # pragma: no cover
"""Set timestamp on update.
Called from SQLAlchemy's
:meth:`before_flush <sqlalchemy.orm.SessionEvents.before_flush>` event to bump the ``updated``
timestamp on modified instances.
Args:
session: The sync :class:`Session <sqlalchemy.orm.Session>` instance that underlies the async
session.
"""
for instance in session.dirty:
state = inspect(instance)
if not state or not hasattr(state.mapper.class_, "updated_at") or state.deleted or instance in session.new:
continue
updated_at_attr = state.attrs.get("updated_at")
if not updated_at_attr or updated_at_attr.history.added:
# Respect explicit user assignments such as manual overrides or import routines
continue
if _has_persistent_column_changes(state, skip_keys={"updated_at"}):
instance.updated_at = datetime.datetime.now(datetime.timezone.utc)
you can just replace this with your own if you want the same behavior
register your new function like so:
if self.enable_touch_updated_timestamp_listener:
from sqlalchemy import event
from sqlalchemy.orm import Session
from advanced_alchemy._listeners import touch_updated_timestamp
event.listen(Session, "before_flush", touch_updated_timestamp)
Yeah I am not sure if at this point the listener is worth it. We will push through this FastAPI to Litestar migration first and then do a DB migration to make our columns tz aware, which is probably ideal anyway
Makes sense. Let me know if you have any other issues with it. you should be able to layer these things back in when you are ready.
I did raise a question on the Unexpected default type issue about why it's not a bug
Is the problem that AA does not know the type of default=func.now
no, i think that's the incorrect way to set this. func.now is a server side default right?
does this work normally?
Yeah but SQLAlchemy does differentiate between
default=func.now()
server_default=func.now()
Even though it's very confusing
it's server_default and server_onupdate
I think server_default assumes the ORM does literally nothing and the DB will fill it in. But you need a migration
default=func.now()
ORM Basically writes a query with now()
ok, let me rephrase to see if this is still correct
you are saying there's a DDL level default from alembic when server_default is setup. But SQLAlchemy doesn't auto add the now() inline as well?
and when you set default == to server side function, it adds now() in the statement but not in the alembic DDL?
The default=func.now() I am very sure because I see it add now() in the statement and i don't see any schema change
The first part I haven't tested myself but ChatGPT does seem to think it doesn't auto add now if you use server_default and it's a schema change
There's a lot of options... Let's split this into 2 different issues. 1) how does sqlalchemy behave and 2) how does alembic behave. User's could have SQLAlchemy or both, so both behaviors need to be understood.
we may need to come up with a fix depending on each of these.
I can see the DDL issue. That makes sense because alembic is probably coded to look at that column for "fetched values" which is what this is
Currently Fileobjects are automatically deleted, if the corresponding row is deleted, isn't it? Is there a way to avoid this? Currently I have two file objects in my rows, "file_local" and file_cloud". File local is the first entrypoint for the data and each night the data object is copied to another s3 backend with the "file_cloud" object. Now we need to shrink the local database without loosing the old data in the cloud, but I'm afraid that the "file_cloud" object is automatically deleted.
ok I could use a raw sql statement I assume 😄
You should be able to disable this listener to leaving this turned off. But it may be a bit tricky in a few places.