I want to split a nextflow workflow into separate chunks (vehicle types), then split these chunks even further (vehicles) and then report as soon as all workers for a specific vehicle type are done.
Currently nextflow main.nf results in:
N E X T F L O W ~ version 24.04.2
Launching `nextflow_pipeline/main.nf` [amazing_bell] DSL2 - revision: aae9cee085
executor > local (11)
[6e/28da8a] start | 1 of 1 ✔
[36/743eea] step1 (2) | 2 of 2 ✔
[cd/50083f] step2 (6) | 6 of 6 ✔
[5f/a823c3] step3 (1) | 2 of 2 ✔
step2: boats1
step2: boats2
step2: cars2
step2: cars3
step2: cars1
step2: cars4
all cars are done
all boats are done
What I would like instead is:
N E X T F L O W ~ version 24.04.2
Launching `nextflow_pipeline/main.nf` [amazing_bell] DSL2 - revision: aae9cee085
executor > local (11)
[6e/28da8a] start | 1 of 1 ✔
[36/743eea] step1 (2) | 2 of 2 ✔
[cd/50083f] step2 (6) | 6 of 6 ✔
[5f/a823c3] step3 (1) | 2 of 2 ✔
step2: boats1
step2: boats2
all boats are done
step2: cars2
step2: cars3
step2: cars1
step2: cars4
all cars are done
Minimal reproducible example:
process start {
output:
path "vehicle_types.list", emit: vehicle_type
"""
#!/usr/bin/python3
with open('vehicle_types.list', 'w') as fout:
for vehicle in ['boats', 'cars']:
fout.write(vehicle + "\n")
"""
}
process step1 {
debug true
input:
val vehicle_type
output:
path "vehicles.list", emit: vehicles
"""
#!/usr/bin/python3
import time
if "${vehicle_type}" == 'boats':
n = 2
else:
n = 4
with open('vehicles.list', 'w') as fout:
for i in range(n):
time.sleep(2)
fout.write("${vehicle_type}" + str(i + 1) + "\n")
"""
}
process step2 {
debug true
input:
val vehicle
output:
path "not_needed.txt", emit: hacky_output
"""
echo step2: ${vehicle}
echo hello > not_needed.txt
"""
}
process step3 {
debug true
input:
val vehicle_type
val hacky_variable_to_ensure_step2_completed
"""
echo "all ${vehicle_type} are done"
"""
}
workflow {
main:
start()
start.out.vehicle_type.splitText().map{it -> it.trim()}.set{vehicle_type}
step1(vehicle_type)
step1.out.vehicles.splitText().map{it -> it.trim()}.set{vehicles}
step2(vehicles)
step3(vehicle_type, step2.out.hacky_output.last())
}
Currently step3 is being blocked by the .last()
. I want to concat the results from step2 in a non-blocking way as soon as all the workers for a specific vehicle type are done instead of waiting till all workers for step2 are done. How do I achieve this?
It sounds like you’re looking for the collect()
operator if you want step3
to not even start until all of step2
is complete.
This may work:
step3(vehicle_type, step2.out.hacky_output.collect())
2