#Python: Have one @function run several other functions concurrently / asynchronously

1 messages · Page 1 of 1 (latest)

flint river
#

Hi, I'm trying to figure out how to best represent my existing CI pipeline in Dagger, which consists of several build / test / deploy steps, some of which should ideally run in parallel because they are heavily I/O-bound. As a toy example, let me use my Python pipeline from #1379046207527325746 message which defines the following convenience @function:

    @function
    async def run_pyproject_pipeline(self, pyproject_dir: FilteredPyProjectDir) -> None:
        """
        Run the given Python project through a set of standard CI jobs.
        """
        await self.check_formatting(pyproject_dir)
        await self.lint(pyproject_dir)
        await self.typecheck(pyproject_dir)
        await self.run_unit_tests(pyproject_dir)

Here, the formatting/lint/typecheck/test @functions are executed one after the other. However, I would like to execute them in parallel. The standard way to do that using Python asyncio would be something like

    @function
    async def run_pyproject_pipeline(self, pyproject_dir: FilteredPyProjectDir) -> None:
        """
        Run the given Python project through a set of standard CI jobs.
        """
        await asyncio.wait([
            asyncio.create_task(self.check_formatting(pyproject_dir)),
            asyncio.create_task(self.lint(pyproject_dir)),
            asyncio.create_task(self.typecheck(pyproject_dir)),
            asyncio.create_task(self.run_unit_tests(pyproject_dir)),
        ])

This seems to work but it does mess up Dagger's command line output. What is the recommended way to have a given Dagger function execute several subtasks in parallel?

kindred finch
#

I would use a task group, but assuming that's running concurrently correctly, what specifically messes the CLI output? You mean they're mixed and you want to group them appropriately? If so, the best solution right now is to use custom telemetry spans: https://docs.dagger.io/cookbook/#create-custom-spans

flint river
#

Thanks for the suggestion – asyncio.TaskGroup seems to be doing the trick! It now prints the full list of steps and traces, whereas the previous approach using asyncio.wait() only printed

$ dagger call -m . run-pyproject-pipeline --pyproject-dir .
✔ connect 0.3s
✔ load module 5.6s
✔ parsing command line arguments 0.0s

✔ Host.directory(exclude: [".git", "**/.venv"], path: "/home/user/repos/myrepo/myproject"): Directory! 0.2s

✔ myPipeline: MyPipeline! 1.3s
✔ .runPyprojectPipeline(
│ │ pyprojectDir: Host.directory(exclude: [".git", "**/.venv"], path: "/home/user/repos/myrepo/myproject"): Directory!
│ ): Void 19.3s

Setup tracing at https://dagger.cloud/traces/setup. To hide set DAGGER_NO_NAG=1

A new release of dagger is available: v0.18.8 → v0.18.9
To upgrade, see https://docs.dagger.io/install
https://github.com/dagger/dagger/releases/tag/v0.18.9

$

It actually prints a lot more while it is running, i.e. all the details and timestamps for all the different steps and calls it executes, but then upon exit all that output disappears again. With a TaskGroup the latter is not the case – all the logs (including error messages) are preserved as expected. Not sure what's going on there.

#

Update: Unfortunately, TaskGroup doesn't exactly do what I need – from the doc string of asyncio.TaskGroup:

    Any exceptions other than `asyncio.CancelledError` raised within
    a task will cancel all remaining tasks and wait for them to exit.

However, I need all tasks to keep running until each of them either completes or raises an exception.

flint river
#

Another update: It looks like the following does what I need:

        await run_in_parallel(
            self.check_formatting(pyproject_dir),
            self.lint(pyproject_dir),
            self.typecheck(pyproject_dir),
            self.run_unit_tests(pyproject_dir),
        )

where run_in_parallel() is defined as

async def run_in_parallel(*awaitables) -> None:
    results = await asyncio.gather(*awaitables, return_exceptions=True)

    for result in results:
        if isinstance(result, Exception):
            raise result