I am trying to design a kind of map-reduce workflow in Celery where the number of maps comes from another Task result.
Those are my tasks
@app.task
def emitter():
return 5
@app.task()
def map(num):
# do something useful here
return num + 1
@app.task()
def reduce(results):
# put the maps together and do something with the results
return sum(results)
@app.task()
def mapper(n):
maps = [map.s(i) for i in range(n)]
mapreduce = chord(maps)(reduce.s())
return mapreduce
The emitter is who says how many maps run.
The map is the task that performs an action
The reducer is a task that aggregates the map result
The mapper receives the number of desired maps from the emitter, creates the maps, and binds a reducer to them.
If I don’t use the emitter, and send a static desired value of maps like this
def wf_1():
res = mapper(5)
print(res.get())
I will get the actual result
result: 15
If I chain also the emitter task as following
def wf_2():
# split the map and return the id of the tasks
workflow = chain(emitter.s(), mapper.s())
res = workflow.apply_async()
breakpoint()
print(res.get())
I get instead a list of ids
[['437a080f-4e26-41a3-a185-e04641eb6b6e', [['12e99e9a-e2c7-4d1c-86b2-4c036c998ea0', None], [[['7d61d958-561f-48d2-b017-879de4001e11', None], None], [['d2565654-9135-4889-8193-2de772dd8603', None], None], [['e3c407f6-fe87-4abe-85db-35c9bff9c511', None], None], [['02af1c7c-6d85-4a23-a26f-62db6719fd84', None], None], [['4627702c-597f-419b-a34c-db002a3594ff', None], None]]]], None]
To get the actual result I have to get the AsyncResult from the first element of the list returned by the res.get()
, like that:
def wf_2():
# split the map and return the id of the tasks
workflow = chain(emitter.s(), mapper.s())
res = workflow.apply_async()
task_id = res.get()[0][0]
print(AsyncResult(task_id).get())
Of course, I am trying to avoid doing something like:
def wf_3():
n = emitter.apply_async().get()
res = mapper(n)
print(res.get())
I don’t get why I am receiving those IDS instead of a simple result, and I do not understand what might be the correct solution to get it.