I am trying to design a straightforward workflow in Celery: a Father Task that produces some data and returns them, N different children tasks that elaborate the same data coming from the father and return their result.
As a bonus, I would like to process all the children’s results together.
This is what I designed until now
from celery import Celery, chain, group
from datetime import datetime, UTC
app = Celery('tasks', backend='rpc://', broker='pyamqp://guest@localhost//')
def _get_data():
return 1, 2
def sign(name, result):
return {
'name': name,
'result': result,
'time': datetime.now(tz=UTC).isoformat()
}
@app.task()
def children_1(x, y):
signature = "children_1"
result = x + y # do some processing with input data
return sign(signature, result)
@app.task
def children_2(x, y):
signature = "children_2"
result = x - y # do some processing with input data
return sign(signature, result)
@app.task
def closer(**kwargs):
return {
'result': kwargs,
'time': datetime.now(tz=UTC).isoformat()
}
@app.task
def father():
x, y = _get_data() # fetch data from somewhere
x += 1 # do some processing
y += 2 # do some processing
# return x, y # gets same error
return {'x': x, 'y': y}
def item_creation_workflow():
print("Workflow started")
wf = chain(
father.s(),
group(
children_1.s(),
children_2.s()
),
closer.s()
)
res = wf.apply_async().get(timeout=1)
print(res)
if __name__ == '__main__':
item_creation_workflow()
print("Started")
The error I continue to get is:
TypeError: children_1() missing 1 required positional argument: 'y'
I cannot figure out how to simply pass the results, from the docs this use case is not described even if in a simple chain the result of a task should pass through the linked tasks