From cc2cab4e18cfa8fc46b65bdf1f05c1bcece1e999 Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Wed, 19 Jun 2024 20:07:02 -0400 Subject: [PATCH 1/4] fix: Fix example --- containers/compose.yaml | 2 -- example/tasks/parallel.py | 42 ++++++++++++++++----------------------- example/tasks/serial.py | 42 ++++++++++++++++----------------------- 3 files changed, 34 insertions(+), 52 deletions(-) diff --git a/containers/compose.yaml b/containers/compose.yaml index 1e4388b..e2b49e3 100644 --- a/containers/compose.yaml +++ b/containers/compose.yaml @@ -4,13 +4,11 @@ services: valkey: image: valkey/valkey:7.2.5-alpine hostname: valkey - container_name: valkey ports: - 6379:6379 # celery: # hostname: celery - # container_name: celery # build: # context: .. # dockerfile: containers/celery/Dockerfile diff --git a/example/tasks/parallel.py b/example/tasks/parallel.py index 6b2be2a..b4ca390 100644 --- a/example/tasks/parallel.py +++ b/example/tasks/parallel.py @@ -6,44 +6,36 @@ import celery -from retsu import ResultTask from retsu.celery import ParallelCeleryTask from .config import app, redis_client @app.task -def task_parallel_a1(a: int, b: int, task_id: str) -> int: # type: ignore - """Define the task_a1.""" +def task_parallel_a_plus_b(a: int, b: int, task_id: str) -> int: # type: ignore + """Define the task_parallel_a_plus_b.""" sleep(a + b) - print("running task a1") + print("running task_parallel_a_plus_b") result = a + b - redis_client.set(f"result-{task_id}", result) + redis_client.set(f"parallel-result-a-plus-b-{task_id}", result) return result @app.task -def task_parallel_a2(task_id: str) -> int: # type: ignore - """Define the task_a2.""" - print("running task a2") - result = redis_client.get(f"result-{task_id}") +def task_parallel_result_plus_10(task_id: str) -> int: # type: ignore + """Define the task_parallel_result_plus_10.""" + print("running task_parallel_result_plus_10") + result = redis_client.get(f"parallel-result-a-plus-b-{task_id}") + redis_client.set(f"parallel-result-plus-10-{task_id}", result + 10) return result @app.task -def task_parallel_final(results, task_id: str) -> int: # type: ignore - """Define the final_task.""" - print("running final task") - - result = redis_client.get(f"result-{task_id}") - final_result = f"Final result: {result}" - print(final_result) - - task_result = ResultTask() - - task_result.save(task_id=task_id, result=final_result) - - return final_result +def task_parallel_result_square(results, task_id: str) -> int: # type: ignore + """Define the task_parallel_result_square.""" + print("running task_parallel_result_square") + result = redis_client.get(f"parallel-result-plus-10-{task_id}") + return result**2 class MyParallelTask1(ParallelCeleryTask): @@ -59,8 +51,8 @@ def get_chord_tasks( """Define the list of tasks for celery chord.""" return ( [ - task_parallel_a1.s(a, b, task_id), - task_parallel_a2.s(task_id), + task_parallel_a_plus_b.s(a, b, task_id), + task_parallel_result_plus_10.s(task_id), ], - task_parallel_final.s(task_id), + task_parallel_result_square.s(task_id), ) diff --git a/example/tasks/serial.py b/example/tasks/serial.py index 77377fb..8ad6120 100644 --- a/example/tasks/serial.py +++ b/example/tasks/serial.py @@ -6,44 +6,36 @@ import celery -from retsu import ResultTask from retsu.celery import SerialCeleryTask from .config import app, redis_client @app.task -def task_serial_a1(a: int, b: int, task_id: str) -> int: # type: ignore - """Define the task_a1.""" +def task_serial_a_plus_b(a: int, b: int, task_id: str) -> int: # type: ignore + """Define the task_serial_a_plus_b.""" sleep(a + b) - print("running task a1") + print("running task_serial_a_plus_b") result = a + b - redis_client.set(f"result-{task_id}", result) + redis_client.set(f"serial-result-a-plus-b-{task_id}", result) return result @app.task -def task_serial_a2(task_id: str) -> int: # type: ignore - """Define the task_a2.""" - print("running task a2") - result = redis_client.get(f"result-{task_id}") +def task_serial_result_plus_10(task_id: str) -> int: # type: ignore + """Define the task_serial_result_plus_10.""" + print("running task_serial_result_plus_10") + result = redis_client.get(f"serial-result-a-plus-b-{task_id}") + redis_client.set(f"serial-result-plus-10-{task_id}", result + 10) return result @app.task -def task_serial_final(results, task_id: str) -> int: # type: ignore - """Define the final_task.""" - print("running final task") - - result = redis_client.get(f"result-{task_id}") - final_result = f"Final result: {result}" - print(final_result) - - task_result = ResultTask() - - task_result.save(task_id=task_id, result=final_result) - - return final_result +def task_serial_result_square(results, task_id: str) -> int: # type: ignore + """Define the task_serial_result_square.""" + print("running task_serial_result_square") + result = redis_client.get(f"serial-result-plus-10-{task_id}") + return result**2 class MySerialTask1(SerialCeleryTask): @@ -59,8 +51,8 @@ def get_chord_tasks( """Define the list of tasks for celery chord.""" return ( [ - task_serial_a1.s(a, b, task_id), - task_serial_a2.s(task_id), + task_serial_a_plus_b.s(a, b, task_id), + task_serial_result_plus_10.s(task_id), ], - task_serial_final.s(task_id), + task_serial_result_square.s(task_id), ) From 436070751fc97cda604d83db7019ecbc47122bad Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Wed, 19 Jun 2024 20:22:38 -0400 Subject: [PATCH 2/4] move examples for a new folder --- .makim.yaml | 12 ------------ examples/README.md | 8 ++++++++ .../redis_queue_between_tasks}/.gitignore | 0 .../redis_queue_between_tasks}/__init__.py | 0 .../redis_queue_between_tasks}/app.py | 0 examples/redis_queue_between_tasks/run.sh | 14 ++++++++++++++ .../redis_queue_between_tasks}/settings.py | 0 .../redis_queue_between_tasks}/tasks/__init__.py | 0 .../redis_queue_between_tasks}/tasks/app.py | 0 .../redis_queue_between_tasks}/tasks/config.py | 0 .../redis_queue_between_tasks}/tasks/parallel.py | 0 .../redis_queue_between_tasks}/tasks/serial.py | 0 pyproject.toml | 2 +- 13 files changed, 23 insertions(+), 13 deletions(-) create mode 100644 examples/README.md rename {example => examples/redis_queue_between_tasks}/.gitignore (100%) rename {example => examples/redis_queue_between_tasks}/__init__.py (100%) rename {example => examples/redis_queue_between_tasks}/app.py (100%) create mode 100755 examples/redis_queue_between_tasks/run.sh rename {example => examples/redis_queue_between_tasks}/settings.py (100%) rename {example => examples/redis_queue_between_tasks}/tasks/__init__.py (100%) rename {example => examples/redis_queue_between_tasks}/tasks/app.py (100%) rename {example => examples/redis_queue_between_tasks}/tasks/config.py (100%) rename {example => examples/redis_queue_between_tasks}/tasks/parallel.py (100%) rename {example => examples/redis_queue_between_tasks}/tasks/serial.py (100%) diff --git a/.makim.yaml b/.makim.yaml index bc4708d..4b2a4f9 100644 --- a/.makim.yaml +++ b/.makim.yaml @@ -56,18 +56,6 @@ groups: run: | pytest ${{ args.path }} ${{ args.params }} - example: - help: Run the example app - shell: bash - run: | - set -ex - sugar build - sugar ext restart --options -d - sleep 5 - cd example/ - celery -A tasks.app worker --loglevel=debug & - python app.py - setup: help: Run the setup for the unit tests shell: bash diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..4fa8fed --- /dev/null +++ b/examples/README.md @@ -0,0 +1,8 @@ +# Examples of usage of Retsu + +This folder contains some examples of usage of Retsu. + +## Using redis as a queue between celery tasks + +The `redis_queue_between_tasks` folder contains an example about how to create +extra queues to establish communication across different celery chord tasks. diff --git a/example/.gitignore b/examples/redis_queue_between_tasks/.gitignore similarity index 100% rename from example/.gitignore rename to examples/redis_queue_between_tasks/.gitignore diff --git a/example/__init__.py b/examples/redis_queue_between_tasks/__init__.py similarity index 100% rename from example/__init__.py rename to examples/redis_queue_between_tasks/__init__.py diff --git a/example/app.py b/examples/redis_queue_between_tasks/app.py similarity index 100% rename from example/app.py rename to examples/redis_queue_between_tasks/app.py diff --git a/examples/redis_queue_between_tasks/run.sh b/examples/redis_queue_between_tasks/run.sh new file mode 100755 index 0000000..53783a8 --- /dev/null +++ b/examples/redis_queue_between_tasks/run.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +set -ex + +pushd ../../ + +sugar build +sugar ext restart --options -d +sleep 5 + +popd + +celery -A tasks.app worker --loglevel=debug & +python app.py diff --git a/example/settings.py b/examples/redis_queue_between_tasks/settings.py similarity index 100% rename from example/settings.py rename to examples/redis_queue_between_tasks/settings.py diff --git a/example/tasks/__init__.py b/examples/redis_queue_between_tasks/tasks/__init__.py similarity index 100% rename from example/tasks/__init__.py rename to examples/redis_queue_between_tasks/tasks/__init__.py diff --git a/example/tasks/app.py b/examples/redis_queue_between_tasks/tasks/app.py similarity index 100% rename from example/tasks/app.py rename to examples/redis_queue_between_tasks/tasks/app.py diff --git a/example/tasks/config.py b/examples/redis_queue_between_tasks/tasks/config.py similarity index 100% rename from example/tasks/config.py rename to examples/redis_queue_between_tasks/tasks/config.py diff --git a/example/tasks/parallel.py b/examples/redis_queue_between_tasks/tasks/parallel.py similarity index 100% rename from example/tasks/parallel.py rename to examples/redis_queue_between_tasks/tasks/parallel.py diff --git a/example/tasks/serial.py b/examples/redis_queue_between_tasks/tasks/serial.py similarity index 100% rename from example/tasks/serial.py rename to examples/redis_queue_between_tasks/tasks/serial.py diff --git a/pyproject.toml b/pyproject.toml index 2659708..e3b4378 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,4 +113,4 @@ ignore_missing_imports = true warn_unused_ignores = true warn_redundant_casts = true warn_unused_configs = true -exclude = ["example/", "scripts/"] +exclude = ["examples/", "scripts/"] From ddd90f9570c7797ac9ab384cbfdc8ec89e60bf5e Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Wed, 19 Jun 2024 21:50:38 -0400 Subject: [PATCH 3/4] fix examples --- examples/redis_queue_between_tasks/README.md | 11 ++++ examples/redis_queue_between_tasks/app.py | 58 ++++++++++++++----- .../tasks/parallel.py | 18 ++++-- .../redis_queue_between_tasks/tasks/serial.py | 20 +++++-- src/retsu/celery.py | 58 ++++++++++++++++--- src/retsu/tracking.py | 7 +++ 6 files changed, 141 insertions(+), 31 deletions(-) create mode 100644 examples/redis_queue_between_tasks/README.md diff --git a/examples/redis_queue_between_tasks/README.md b/examples/redis_queue_between_tasks/README.md new file mode 100644 index 0000000..ae240a7 --- /dev/null +++ b/examples/redis_queue_between_tasks/README.md @@ -0,0 +1,11 @@ +# How to test + +You can run `run.sh` in your terminal, and in the web browser you can try the +following endpoints: + +- http://127.0.0.1:5000/serial/10/20 +- http://127.0.0.1:5000/parallel/10/20 +- http://127.0.0.1:5000/serial/result/[TASK_ID] +- http://127.0.0.1:5000/parallel/result/[TASK_ID] + +Remember to replace `[TASK_ID]` by the desired task id. diff --git a/examples/redis_queue_between_tasks/app.py b/examples/redis_queue_between_tasks/app.py index 615e79b..3d8e968 100644 --- a/examples/redis_queue_between_tasks/app.py +++ b/examples/redis_queue_between_tasks/app.py @@ -3,7 +3,8 @@ import os import signal -from typing import Optional +from time import sleep +from typing import Any, Optional from flask import Flask from tasks import MyTaskManager @@ -40,55 +41,84 @@ def api() -> str: * parallel * status * result - """ + + Example of endpoints: + + - http://127.0.0.1:5000/serial/1/2 + - http://127.0.0.1:5000/parallel/1/2 + - http://127.0.0.1:5000/serial/result/[TASK_ID] + - http://127.0.0.1:5000/serial/status/[TASK_ID] + - http://127.0.0.1:5000/parallel/result/[TASK_ID] + - http://127.0.0.1:5000/parallel/status/[TASK_ID] + + Remember to replace `[TASK_ID]` by the desired task id. + """.replace("\n", "
") return menu @app.route("/serial//") -def serial(a: int, b: int) -> str: +def serial(a: int, b: int) -> dict[str, Any]: """Define the serial endpoint.""" task1 = task_manager.get_task("serial") key = task1.request(a=a, b=b) - return f"your task ({key}) is running now, please wait until it is done." + return {"message": f"Your task ({key}) is running now"} @app.route("/parallel//") -def parallel(a: int, b: int) -> str: +def parallel(a: int, b: int) -> dict[str, Any]: """Define the parallel endpoint.""" task2 = task_manager.get_task("parallel") key = task2.request(a=a, b=b) - return f"your task ({key}) is running now, please wait until it is done." + return {"message": f"Your task ({key}) is running now"} @app.route("/serial/status/") -def serial_status(task_id: str) -> str: +def serial_status(task_id: str) -> dict[str, Any]: """Define serial/status endpoint.""" task1 = task_manager.get_task("serial") - _status = task1.status(task_id) + _status = task1.result.status(task_id) return {"status": _status, "task_id": task_id} @app.route("/parallel/status/") -def parallel_status(task_id: str) -> str: +def parallel_status(task_id: str) -> dict[str, Any]: """Define parallel/status endpoint.""" task2 = task_manager.get_task("parallel") - _status = task2.status(task_id) + _status = task2.result.status(task_id) return {"status": _status, "task_id": task_id} @app.route("/serial/result/") -def serial_result(task_id: str) -> str: +def serial_result(task_id: str) -> dict[str, Any]: """Define serial/result endpoint.""" task1 = task_manager.get_task("serial") - return task1.get_result(task_id) + result = None + for _ in range(10): + try: + # note: with no timeout + result = task1.result.get(task_id) + break + except Exception: + sleep(1) + + if result is None: + return {"Error": "Result is not ready yet."} + return {"result": result[0]} @app.route("/parallel/result/") -def parallel_result(task_id: str) -> str: +def parallel_result(task_id: str) -> dict[str, Any]: """Define parallel/result endpoint.""" task2 = task_manager.get_task("parallel") - return task2.get_result(task_id) + + try: + # note: with timeout + result = task2.result.get(task_id, timeout=10) + except Exception: + return {"Error": "Result is not ready yet."} + + return {"result": result[-1]} if __name__ == "__main__": diff --git a/examples/redis_queue_between_tasks/tasks/parallel.py b/examples/redis_queue_between_tasks/tasks/parallel.py index b4ca390..d9615ae 100644 --- a/examples/redis_queue_between_tasks/tasks/parallel.py +++ b/examples/redis_queue_between_tasks/tasks/parallel.py @@ -25,17 +25,25 @@ def task_parallel_a_plus_b(a: int, b: int, task_id: str) -> int: # type: ignore def task_parallel_result_plus_10(task_id: str) -> int: # type: ignore """Define the task_parallel_result_plus_10.""" print("running task_parallel_result_plus_10") - result = redis_client.get(f"parallel-result-a-plus-b-{task_id}") - redis_client.set(f"parallel-result-plus-10-{task_id}", result + 10) - return result + result = None + while result is None: + result = redis_client.get(f"parallel-result-a-plus-b-{task_id}") + sleep(1) + + final_result = int(result) + 10 + redis_client.set(f"parallel-result-plus-10-{task_id}", final_result) + return final_result @app.task def task_parallel_result_square(results, task_id: str) -> int: # type: ignore """Define the task_parallel_result_square.""" print("running task_parallel_result_square") - result = redis_client.get(f"parallel-result-plus-10-{task_id}") - return result**2 + result = None + while result is None: + result = redis_client.get(f"parallel-result-plus-10-{task_id}") + sleep(1) + return int(result) ** 2 class MyParallelTask1(ParallelCeleryTask): diff --git a/examples/redis_queue_between_tasks/tasks/serial.py b/examples/redis_queue_between_tasks/tasks/serial.py index 8ad6120..ce3d02a 100644 --- a/examples/redis_queue_between_tasks/tasks/serial.py +++ b/examples/redis_queue_between_tasks/tasks/serial.py @@ -25,8 +25,14 @@ def task_serial_a_plus_b(a: int, b: int, task_id: str) -> int: # type: ignore def task_serial_result_plus_10(task_id: str) -> int: # type: ignore """Define the task_serial_result_plus_10.""" print("running task_serial_result_plus_10") - result = redis_client.get(f"serial-result-a-plus-b-{task_id}") - redis_client.set(f"serial-result-plus-10-{task_id}", result + 10) + previous_result = None + while previous_result is None: + previous_result = redis_client.get(f"serial-result-a-plus-b-{task_id}") + sleep(1) + + previous_result_int = int(previous_result) + result = previous_result_int + 10 + redis_client.set(f"serial-result-plus-10-{task_id}", result) return result @@ -34,8 +40,14 @@ def task_serial_result_plus_10(task_id: str) -> int: # type: ignore def task_serial_result_square(results, task_id: str) -> int: # type: ignore """Define the task_serial_result_square.""" print("running task_serial_result_square") - result = redis_client.get(f"serial-result-plus-10-{task_id}") - return result**2 + previous_result = None + while previous_result is None: + previous_result = redis_client.get(f"serial-result-plus-10-{task_id}") + sleep(1) + + previous_result_int = int(previous_result) + result = previous_result_int**2 + return result class MySerialTask1(SerialCeleryTask): diff --git a/src/retsu/celery.py b/src/retsu/celery.py index 9674371..24204fa 100644 --- a/src/retsu/celery.py +++ b/src/retsu/celery.py @@ -18,18 +18,30 @@ class CeleryTask: def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore """Define the task to be executed.""" chord_tasks, chord_callback = self.get_chord_tasks( - *args, task_id=task_id, **kwargs + *args, + task_id=task_id, + **kwargs, + ) + group_tasks = self.get_group_tasks( + *args, + task_id=task_id, + **kwargs, + ) + chain_tasks = self.get_chain_tasks( + *args, + task_id=task_id, + **kwargs, ) - chain_tasks = self.get_chain_tasks(*args, task_id=task_id, **kwargs) # start the tasks if chord_tasks: - if chord_callback: - workflow_chord = chord(chord_tasks, chord_callback) - else: - workflow_chord = group(chord_tasks) + workflow_chord = chord(chord_tasks, chord_callback) promise_chord = workflow_chord.apply_async() + if group_tasks: + workflow_group = group(group_tasks) + promise_group = workflow_group.apply_async() + if chain_tasks: workflow_chain = chain(chord_tasks) promise_chain = workflow_chain.apply_async() @@ -37,10 +49,26 @@ def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore # wait for the tasks results: list[Any] = [] if chord_tasks: - results.extend(promise_chord.get()) + chord_result = promise_chord.get() + if isinstance(chord_result, list): + results.extend(chord_result) + else: + results.append(chord_result) + + if group_tasks: + group_result = promise_group.get() + if isinstance(group_result, list): + results.extend(group_result) + else: + results.append(group_result) if chain_tasks: - results.append(promise_chain.get()) + chain_result = promise_chain.get() + + if isinstance(chain_result, list): + results.extend(chain_result) + else: + results.append(chain_result) return results @@ -59,6 +87,20 @@ def get_chord_tasks( # type: ignore callback_task = None return (chord_tasks, callback_task) + def get_group_tasks( # type: ignore + self, *args, **kwargs + ) -> list[celery.Signature]: + """ + Run tasks with group. + + Return + ------ + tuple: + list of tasks for the chord, and the task to be used as a callback + """ + group_tasks: list[celery.Signature] = [] + return group_tasks + def get_chain_tasks( # type: ignore self, *args, **kwargs ) -> list[celery.Signature]: diff --git a/src/retsu/tracking.py b/src/retsu/tracking.py index 1b9703a..4b00908 100644 --- a/src/retsu/tracking.py +++ b/src/retsu/tracking.py @@ -119,6 +119,13 @@ def get(self, task_id: str, timeout: Optional[int] = None) -> Any: "Timeout(get): Task result is not ready yet. " f"Task status: {status}" ) + + elif self.status(task_id) != "completed": + status = self.status(task_id) + raise Exception( + "Timeout(get): Task result is not ready yet. " + f"Task status: {status}" + ) result = self.metadata.get(task_id, "result") return pickle.loads(result) if result else result From a00d023eb23312c72825ac0e1446b12fb4179a6d Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Wed, 19 Jun 2024 21:58:47 -0400 Subject: [PATCH 4/4] fix tests --- tests/test_task_celery_parallel.py | 20 +++++++------------- tests/test_task_celery_serial.py | 20 +++++++------------- 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/tests/test_task_celery_parallel.py b/tests/test_task_celery_parallel.py index 15b8123..d4ea08e 100644 --- a/tests/test_task_celery_parallel.py +++ b/tests/test_task_celery_parallel.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Generator, Optional +from typing import Generator import celery import pytest @@ -16,32 +16,26 @@ class MyResultTask(ParallelCeleryTask): """Task for the test.""" - def get_chord_tasks( # type: ignore + def get_group_tasks( # type: ignore self, *args, **kwargs - ) -> tuple[list[celery.Signature], Optional[celery.Signature]]: + ) -> list[celery.Signature]: """Define the list of tasks for celery chord.""" x = kwargs.get("x") y = kwargs.get("y") task_id = kwargs.get("task_id") - return ( - [task_sum.s(x, y, task_id)], - None, - ) + return [task_sum.s(x, y, task_id)] class MyTimestampTask(ParallelCeleryTask): """Task for the test.""" - def get_chord_tasks( # type: ignore + def get_group_tasks( # type: ignore self, *args, **kwargs - ) -> tuple[list[celery.Signature], Optional[celery.Signature]]: + ) -> list[celery.Signature]: """Define the list of tasks for celery chord.""" seconds = kwargs.get("seconds") task_id = kwargs.get("task_id") - return ( - [task_sleep.s(seconds, task_id)], - None, - ) + return [task_sleep.s(seconds, task_id)] @pytest.fixture diff --git a/tests/test_task_celery_serial.py b/tests/test_task_celery_serial.py index 75c0d59..46f6f23 100644 --- a/tests/test_task_celery_serial.py +++ b/tests/test_task_celery_serial.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Generator, Optional +from typing import Generator import celery import pytest @@ -16,32 +16,26 @@ class MyResultTask(SerialCeleryTask): """Task for the test.""" - def get_chord_tasks( # type: ignore + def get_group_tasks( # type: ignore self, *args, **kwargs - ) -> tuple[list[celery.Signature], Optional[celery.Signature]]: + ) -> list[celery.Signature]: """Define the list of tasks for celery chord.""" x = kwargs.get("x") y = kwargs.get("y") task_id = kwargs.get("task_id") - return ( - [task_sum.s(x, y, task_id)], - None, - ) + return [task_sum.s(x, y, task_id)] class MyTimestampTask(SerialCeleryTask): """Task for the test.""" - def get_chord_tasks( # type: ignore + def get_group_tasks( # type: ignore self, *args, **kwargs - ) -> tuple[list[celery.Signature], Optional[celery.Signature]]: + ) -> list[celery.Signature]: """Define the list of tasks for celery chord.""" seconds = kwargs.get("seconds") task_id = kwargs.get("task_id") - return ( - [task_sleep.s(seconds, task_id)], - None, - ) + return [task_sleep.s(seconds, task_id)] @pytest.fixture