#Issue with querying data from multiple tables

1 messages · Page 1 of 1 (latest)

tranquil grail
#

Hey, there. I'm using the domain pattern from litestar-fullstack and I'm seeing some behavior I don't fully understand after digging through the code. Wondering if anyone can help.

I've got the following code (obfuscated):

within_period = select(N.s_id).where(N.wt > datetime.now(timezone('UTC')) - timedelta(minutes=self.wait_period_in_minutes)

s_query = select(NS, E.details).join(E, E.d_id == e_id).where(NS.type == type_, NS.j.in_(js), NS.s_id.not_in(within_wait_period)).distinc(tuple_(NS.email, NS.phone))

return await self.list(statement=s_query)

This gives me typing errors, since this returns more data than just list[NS], as the repository/service pattern typically uses, but that's ok for now (though I'd be curious if there's a better, more recommended pattern for handling data that's not just within the one table mapped to the service, while still allowing using the advanced-alchemy code).

It generates the following query:

select distinct on ((ns.emali, ns.phone)) ns.s_id, ...<other_columns>, e.details from ns join e on e.e_id = $1::VARCHAR where ns.type = <other> and ns.j in (...) and ns.s_id not in (select n.s_id from n where n.wt > ...)

This query seems correct, however, when I get the data back, it's a list of NS records, losing the E.details column. Just curious if there's a pattern I should be using when joining tables like this, or any thoughts as to what might be causing this dropping of the column in the select function.

slim gladeBOT
#

At your assistance, @tranquil grail.
Make sure you include an MCVE in your post if relevant.
When you are done you can tag your post as ✅ Solved or type !solve!
If no one responds within a reasonable amount of time, please ping @Member.

vagrant charm
#

How does E.details relate to the NS model? you'll want to make sure you have those in your query options on this statement.

#

That said, I may have an alternative for you.

#

So, i came up with a way to represent an arbitrary SQL query as an ORM object

#

Here's a new Declarative Base you probably want to declare for these types of objects


class SQLQuery(DeclarativeBase):
    """Base for all SQLAlchemy custom mapped objects."""

    __allow_unmapped__ = True
    registry = orm_registry

    def to_dict(self, exclude: set[str] | None = None) -> dict[str, Any]:
        """Convert model to dictionary.

        Returns:
            dict[str, Any]: A dict representation of the model
        """
        exclude = exclude.union("_sentinel") if exclude else {"_sentinel"}
        return {field.name: getattr(self, field.name) for field in self.__table__.columns if field.name not in exclude}
#

Then you can do something like this:

#

class UserTeamCount(MappedAsDataclass, SQLQuery):
    """Member Count."""

    __table__ = (
        sa.select(User.id.label("user_id"), sa.func.count(TeamMember.id).label("active_teams"))
        .join_from(User, TeamMember, onclause=User.id == TeamMember.user_id, isouter=True)
        .group_by(User.id)
    ).alias("user_workspace_count")
    user_id: UUID
    active_teams: int
#

you can this treat this like any ORM relationship...

#

Here's also some custom repo/service classes to use this directly

#

And finally, the service for it

#

class SQLAlchemyAsyncRepositoryQueryService:
    """Service object that operates on a repository object."""

    def __init__(self, **repo_kwargs: Any) -> None:
        """Configure the service object.

        Args:
            **repo_kwargs: passed as keyword args to repo instantiation.
        """
        self.repository = SQLAlchemyQueryRepository(**repo_kwargs)

    @staticmethod
    def find_filter(filter_type: type[FilterTypeT], *filters: FilterTypes) -> FilterTypeT | None:
        """Get the filter specified by filter type from the filters.

        Args:
            filter_type: The type of filter to find.
            *filters: filter types to apply to the query

        Returns:
            The match filter instance or None
        """
        for filter_ in filters:
            if isinstance(filter_, filter_type):
                return cast("FilterTypeT | None", filter_)
        return None

    @classmethod
    @contextlib.asynccontextmanager
    async def new(
        cls: type[SQLAlchemyAsyncRepositoryQueryServiceT],
        session: AsyncSession | None = None,
    ) -> AsyncIterator[SQLAlchemyAsyncRepositoryQueryServiceT]:
        """Context manager that returns instance of service object.

        Handles construction of the database session._create_select_for_model

        Returns:
            The service object instance.
        """
        if session:
            yield cls(session=session)
        else:
            async with async_session_factory() as db_session:
                yield cls(session=db_session)

    @overload
    def to_schema(self, dto: type[ModelDTOT], data: ModelT | RowMapping) -> ModelDTOT:
        ...

    @overload
    def to_schema(
        self,
        dto: type[ModelDTOT],
        data: Sequence[ModelT] | list[RowMapping],
        total: int | None = None,
        *filters: FilterTypes,
    ) -> OffsetPagination[ModelDTOT]:
        ...

    def to_schema(
        self,
        dto: type[ModelDTOT],
        data: ModelT | Sequence[ModelT] | list[RowMapping] | RowMapping,
        total: int | None = None,
        *filters: FilterTypes,
    ) -> ModelDTOT | OffsetPagination[ModelDTOT]:
        """Convert the object to a response schema.

        Args:
            dto: Collection route filters.
            data: The return from one of the service calls.
            total: the total number of rows in the data
            *filters: Collection route filters.

        Returns:
            The list of instances retrieved from the repository.
        """
        if not isinstance(data, Sequence | list):
            return TypeAdapter(dto).validate_python(data)
        limit_offset = self.find_filter(LimitOffset, *filters)
        total = total if total else len(data)
        limit_offset = limit_offset if limit_offset is not None else LimitOffset(limit=len(data), offset=0)
        return OffsetPagination[dto](  # type: ignore[valid-type]
            items=TypeAdapter(list[dto]).validate_python(data),  # type: ignore[valid-type]
            limit=limit_offset.limit,
            offset=limit_offset.offset,
            total=total,
        )
#

This will probably get added into advanced alchemy in some sort of way

tranquil grail
#

Thanks! I'm realizing I may be able to go with a different approach here that's a bit simpler given the data structure, but I might include this for flexibility regardless 🙂. I look forward to it being added to advanced-alchemy, too!

#

!solve