#Terminate gracefully

1 messages · Page 1 of 1 (latest)

swift coral
#

Can you share a bit of your function where this error is raised from?

vale mica
#

yes let me prepare that

#

(sorry laptop crash I lost my message I reprepare that)

#

I have a common module for my python pipelines:

@object_type
class TestResult:
    report: dagger.File = field()
    exit_code: str = field()

@object_type
class Python:
    @function
    async def test(
            self,
            services: Annotated[list[dagger.Service], Doc("Source directory")] | None,
            workdir: Annotated[str, Doc("Working directory where to run py.test")] | None,
            fail_fast: Annotated[bool, Doc("Define if tests should fail fast or not")] = field(default=False),
    ) -> TestResult:
        """Execute tests"""

        ctr = self.__ctr

        pytest_cmd = "py.test tests/"
        if fail_fast is True:
            pytest_cmd += " -x"

        test_cmd = ["sh", "-c", f"{pytest_cmd} > /report.txt; echo -n $? > /exit_code", ]

        if services:
            for i in services:
                ctr = (
                    self
                    .__ctr
                    .with_service_binding(await i.hostname(), i)
                )

        if workdir:
            ctr = self.__ctr.with_workdir(workdir)

        ctr = await (
            ctr
            .with_exec(["poetry", "install", "--no-root"])
            .with_exec(test_cmd)
            .sync()
        )

        # save report for inspection.
        report = ctr.file("/report.txt")

        # use the saved exit code to determine if the tests passed.
        exit_code = await ctr.file("/exit_code").contents()

        return TestResult(report=report, exit_code=exit_code)
#

Then I have a pipeline for airflow dags which use the previous function:

@object_type
class Airflowdags:
    source: Annotated[dagger.Directory, Doc("Source directory"), DefaultPath("/airflow-dags"), Ignore([".git", "*.env", ".github", ".idea"])]
    pipeline_id: Annotated[str, Doc("Kind of namespace between different projects")]
    aws_creds: Annotated[dagger.Directory | None, Doc("Define the path to aws credentials folder")]
    aws_profile: Annotated[str, Doc("Define the aws profile to use")] = field(default="shared-product")

    def __post_init__(self):
        pass

    @function
    async def test(
            self,
            dag_name: Annotated[str, Doc("The DAG name to target")],
    ) -> str:
        """Returns a container that echoes whatever string argument is provided"""

        config_backend = dag.secret_manager().aws().with_profile(name=self.aws_profile)

        if self.aws_creds:
            config_backend = config_backend.with_credentials_folder(aws_folder=self.aws_creds)

        l =  await(
            dag
            .python(
                self.pipeline_id,
                self.source,
            )
            .with_discover_python_version()
            .with_python_path()
            .with_secret_dotenv(
                # Pros: load specific env vars for a dag Cons all env var are treated as secret so they are obfuscated in the stdout
                content=config_backend.get_secret(f"CI/AIRFLOW_{dag_name.upper()}_CONF")
            )
            .test(
                workdir=dag_name,
            )
        )
#

One thing is wrong in my second snippet is the return code I'm struggling a little to find the syntax for the return type but if I'm putting None and return nothing but just trying to capture the await in my last function I have the same issue

#

when I'm callinf directly my python module it seems to work

#

could it be because from my airflowdags the await is not good

swift coral
# vale mica could it be because from my airflowdags the await is not good

Yeah, there's just confusion around client vs server code here.

When you define a function in a module, you're basically expanding the Dagger API, so your function implementation basically becomes a resolver at the server side of the API. When your Airflowdags module depends on your Python module, while loading Airflowdags, the engine loads Python first, extending the API with Python's functions, but also generating client bindings in Airflowdags generated client library code (under sdk), to be able to talk to Python.

In other words, when you use dag.python() from Airflowdags, dag.python() isn't returning the Python object that you have in the Python module directly. It returns a class that knows how to make client API calls to that module.

As with any method in a dag... chain, you should only use await when a method shows as returning a Python Coroutine[..., <return type>]. This only happens when a function returns a simple value, like a str, int, or bool. Since your Python.test function returns a TestResult object, it must produce a method that continues to build a query (lazy), to enable continued chaining. You must then select a scalar value to allow execution.

You can keep your await, just add another .exit_code() after .test(...). Then you'll query the exit code directly:

l = await dag.python(xxx)...test(yyy).exit_code()

If you need both values, remove the await to save a lazy instance of the query building client, and await for both values separately:

l = dag.python(xxx)...test(yyy)
exit_code = await l.exit_code()
report = await l.report().contents()

Note that dagger.File is also an object, so you have to select a function from it that returns a simple value as well.

vale mica
#

ok thank you I see I will try tomorrow I'm out of the office
I keep you inform

vale mica
#

I begin to have something interesting but one thing I don't find how to write it.
If my python function have 2 fields report and exit_code and at the end setting at the level of Airflowdags the TestReport object it's working but I would like to have the TestReport at the level of my python function but I don't know if I can return from Airflowdags the PythonTestReport

#

from my python function:

    @function
    async def test(
            self,
            services: Annotated[list[dagger.Service], Doc("Source directory")] | None,
            workdir: Annotated[str, Doc("Working directory where to run py.test")] | None,
            fail_fast: Annotated[bool, Doc("Define if tests should fail fast or not")] = field(default=False),
    ) -> TestResult:
        """Execute tests"""

        ctr = self.__ctr

        pytest_cmd = "py.test tests/"
        if fail_fast is True:
            pytest_cmd += " -x"

        test_cmd = ["sh", "-c", f"{pytest_cmd} > /report.txt; echo -n $? > /exit_code", ]

        if services:
            for i in services:
                ctr = (
                    self
                    .__ctr
                    .with_service_binding(await i.hostname(), i)
                )

        if workdir:
            ctr = self.__ctr.with_workdir(workdir)

        ctr = await (
            ctr
            .with_exec(["poetry", "install", "--no-root"])
            .with_exec(test_cmd)
            .sync()
        )

        # # save report for inspection.
        # self.test_report = ctr.file("/report.txt")
        #
        # # use the saved exit code to determine if the tests passed.
        # self.test_exit_code = await ctr.file("/exit_code").contents()

        return TestResult(report=ctr.file("/report.txt"), exit_code=await ctr.file("/exit_code").contents())
#

from Airflowdags

    @function
    async def test(
            self,
            dag_name: Annotated[str, Doc("The DAG name to target")],
    ) -> PythonTestResult:
        """Returns a container that echoes whatever string argument is provided"""

        config_backend = dag.secret_manager().aws().with_profile(name=self.aws_profile)

        if self.aws_creds:
            config_backend = config_backend.with_credentials_folder(aws_folder=self.aws_creds)

        # test_run = (
        #     dag
        #     .python(
        #         self.pipeline_id,
        #         self.source,
        #     )
        #     .with_discover_python_version()
        #     .with_python_path()
        #     .with_secret_dotenv(
        #         # Pros: load specific env vars for a dag Cons all env var are treated as secret so they are obfuscated in the stdout
        #         content=config_backend.get_secret(f"CI/AIRFLOW_{dag_name.upper()}_CONF")
        #     )
        #     .test(
        #         workdir=dag_name,
        #     )
        # )
        #
        # return TestResult(
        #     report=test_run.test_report(),
        #     exit_code= await test_run.test_exit_code()
        # )

        return await (
            dag
            .python(
                self.pipeline_id,
                self.source,
            )
            .with_discover_python_version()
            .with_python_path()
            .with_secret_dotenv(
                # Pros: load specific env vars for a dag Cons all env var are treated as secret so they are obfuscated in the stdout
                content=config_backend.get_secret(f"CI/AIRFLOW_{dag_name.upper()}_CONF")
            )
            .test(
                workdir=dag_name,
            )
        )
#

I tried to import PythonTestResult doing that: from dagger.client.gen import PythonTestResult but I have this error: Error: input: module.withSource.initialize resolve: failed to initialize module: failed to add object to module "airflowdags": failed to validate type def: object "Airflowdags" function "test" cannot return external type from dependency module "python"

#

so I'm not sure if I can do what I'm trying to do

swift coral