Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions .makim.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,6 @@ groups:
run: |
pytest ${{ args.path }} ${{ args.params }}

example:
help: Run the example app
shell: bash
run: |
set -ex
sugar build
sugar ext restart --options -d
sleep 5
cd example/
celery -A tasks.app worker --loglevel=debug &
python app.py

setup:
help: Run the setup for the unit tests
shell: bash
Expand Down
2 changes: 0 additions & 2 deletions containers/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ services:
valkey:
image: valkey/valkey:7.2.5-alpine
hostname: valkey
container_name: valkey
ports:
- 6379:6379

# celery:
# hostname: celery
# container_name: celery
# build:
# context: ..
# dockerfile: containers/celery/Dockerfile
Expand Down
66 changes: 0 additions & 66 deletions example/tasks/parallel.py

This file was deleted.

66 changes: 0 additions & 66 deletions example/tasks/serial.py

This file was deleted.

8 changes: 8 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Examples of usage of Retsu

This folder contains some examples of usage of Retsu.

## Using redis as a queue between celery tasks

The `redis_queue_between_tasks` folder contains an example about how to create
extra queues to establish communication across different celery chord tasks.
File renamed without changes.
11 changes: 11 additions & 0 deletions examples/redis_queue_between_tasks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# How to test

You can run `run.sh` in your terminal, and in the web browser you can try the
following endpoints:

- http://127.0.0.1:5000/serial/10/20
- http://127.0.0.1:5000/parallel/10/20
- http://127.0.0.1:5000/serial/result/[TASK_ID]
- http://127.0.0.1:5000/parallel/result/[TASK_ID]

Remember to replace `[TASK_ID]` by the desired task id.
File renamed without changes.
58 changes: 44 additions & 14 deletions example/app.py → examples/redis_queue_between_tasks/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import os
import signal

from typing import Optional
from time import sleep
from typing import Any, Optional

from flask import Flask
from tasks import MyTaskManager
Expand Down Expand Up @@ -40,55 +41,84 @@ def api() -> str:
* parallel
* status
* result
"""

Example of endpoints:

- http://127.0.0.1:5000/serial/1/2
- http://127.0.0.1:5000/parallel/1/2
- http://127.0.0.1:5000/serial/result/[TASK_ID]
- http://127.0.0.1:5000/serial/status/[TASK_ID]
- http://127.0.0.1:5000/parallel/result/[TASK_ID]
- http://127.0.0.1:5000/parallel/status/[TASK_ID]

Remember to replace `[TASK_ID]` by the desired task id.
""".replace("\n", "<br/>")

return menu


@app.route("/serial/<int:a>/<int:b>")
def serial(a: int, b: int) -> str:
def serial(a: int, b: int) -> dict[str, Any]:
"""Define the serial endpoint."""
task1 = task_manager.get_task("serial")
key = task1.request(a=a, b=b)
return f"your task ({key}) is running now, please wait until it is done."
return {"message": f"Your task ({key}) is running now"}


@app.route("/parallel/<int:a>/<int:b>")
def parallel(a: int, b: int) -> str:
def parallel(a: int, b: int) -> dict[str, Any]:
"""Define the parallel endpoint."""
task2 = task_manager.get_task("parallel")
key = task2.request(a=a, b=b)
return f"your task ({key}) is running now, please wait until it is done."
return {"message": f"Your task ({key}) is running now"}


@app.route("/serial/status/<string:task_id>")
def serial_status(task_id: str) -> str:
def serial_status(task_id: str) -> dict[str, Any]:
"""Define serial/status endpoint."""
task1 = task_manager.get_task("serial")
_status = task1.status(task_id)
_status = task1.result.status(task_id)
return {"status": _status, "task_id": task_id}


@app.route("/parallel/status/<string:task_id>")
def parallel_status(task_id: str) -> str:
def parallel_status(task_id: str) -> dict[str, Any]:
"""Define parallel/status endpoint."""
task2 = task_manager.get_task("parallel")
_status = task2.status(task_id)
_status = task2.result.status(task_id)
return {"status": _status, "task_id": task_id}


@app.route("/serial/result/<string:task_id>")
def serial_result(task_id: str) -> str:
def serial_result(task_id: str) -> dict[str, Any]:
"""Define serial/result endpoint."""
task1 = task_manager.get_task("serial")
return task1.get_result(task_id)
result = None
for _ in range(10):
try:
# note: with no timeout
result = task1.result.get(task_id)
break
except Exception:
sleep(1)

if result is None:
return {"Error": "Result is not ready yet."}
return {"result": result[0]}


@app.route("/parallel/result/<string:task_id>")
def parallel_result(task_id: str) -> str:
def parallel_result(task_id: str) -> dict[str, Any]:
"""Define parallel/result endpoint."""
task2 = task_manager.get_task("parallel")
return task2.get_result(task_id)

try:
# note: with timeout
result = task2.result.get(task_id, timeout=10)
except Exception:
return {"Error": "Result is not ready yet."}

return {"result": result[-1]}


if __name__ == "__main__":
Expand Down
14 changes: 14 additions & 0 deletions examples/redis_queue_between_tasks/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env bash

set -ex

pushd ../../

sugar build
sugar ext restart --options -d
sleep 5

popd

celery -A tasks.app worker --loglevel=debug &
python app.py
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
66 changes: 66 additions & 0 deletions examples/redis_queue_between_tasks/tasks/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""My retsu tasks."""

from __future__ import annotations

from time import sleep

import celery

from retsu.celery import ParallelCeleryTask

from .config import app, redis_client


@app.task
def task_parallel_a_plus_b(a: int, b: int, task_id: str) -> int: # type: ignore
"""Define the task_parallel_a_plus_b."""
sleep(a + b)
print("running task_parallel_a_plus_b")
result = a + b
redis_client.set(f"parallel-result-a-plus-b-{task_id}", result)
return result


@app.task
def task_parallel_result_plus_10(task_id: str) -> int: # type: ignore
"""Define the task_parallel_result_plus_10."""
print("running task_parallel_result_plus_10")
result = None
while result is None:
result = redis_client.get(f"parallel-result-a-plus-b-{task_id}")
sleep(1)

final_result = int(result) + 10
redis_client.set(f"parallel-result-plus-10-{task_id}", final_result)
return final_result


@app.task
def task_parallel_result_square(results, task_id: str) -> int: # type: ignore
"""Define the task_parallel_result_square."""
print("running task_parallel_result_square")
result = None
while result is None:
result = redis_client.get(f"parallel-result-plus-10-{task_id}")
sleep(1)
return int(result) ** 2


class MyParallelTask1(ParallelCeleryTask):
"""MyParallelTask1."""

def request(self, a: int, b: int) -> str:
"""Receive the request for processing."""
return super().request(a=a, b=b)

def get_chord_tasks(
self, a: int, b: int, task_id: str
) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
return (
[
task_parallel_a_plus_b.s(a, b, task_id),
task_parallel_result_plus_10.s(task_id),
],
task_parallel_result_square.s(task_id),
)
Loading