Context : ETL with bunch of tasks and dependencies, to be optimized source oriented.
I have a directed graph (DiGraph) defined using python pkg NetworkX, representing dependencies between various nodes (tables).
I plan to use that dag to generate optimize workflows, depending on data sources availability.
Let’s say I have two data collection as input :
- One is ready at 5AM (collect_A)
- second one at 6AM. (collect_B)
Currently, I’m waiting 6AM to trigger everything, but business need data early.
That’s why I’m looking for a way to travel that graph, to generate optimize workflow with max parallelism source oriented (between workflows, and inside workflows).
Quick code to generate graph with simple but parallelable workflow:
import networkx as nx
G = nx.DiGraph()
G.add_edge("collect_A", "job_bronze_A")
G.add_edge("collect_B", "job_bronze_B")
G.add_edge("job_bronze_A", "job_silver_A")
G.add_edge("job_bronze_B", "job_silver_B")
G.add_edge("job_silver_A", "job_gold_A")
G.add_edge("job_silver_B", "job_gold_A")
G.add_edge("job_silver_A", "job_gold_B")
G.add_edge("job_silver_B", "job_gold_B")
topological_sort = list(nx.topological_sort(G))
print("topological_sort : ",topological_sort)
generations = [(generation) for generation in nx.topological_generations(G)]
print("generations : ",generations)
With that simple DAG, my goal is to split into :
Workflow_A : {
"tasks" : [{
"task":"collect_A",
"dependOn":[]
},
{
"task":"job_bronze_A",
"dependOn":["collect_A"]
},
{
"task":"job_silver_A",
"dependOn":["job_bronze_A"]
}],
"dependOn" : []
}
Workflow_B : {
"tasks" : [{
"task":"collect_B",
"dependOn":[]
},
{
"task":"job_bronze_B",
"dependOn":["collect_B"]
},
{
"task":"job_silver_C",
"dependOn":["job_bronze_C"]
}],
"dependOn" : []
}
Workflow_C : {
"tasks" : [{
"task":"job_gold_A",
"dependOn":[]
},
{
"task":"job_gold_B",
"dependOn":[]
}],
"dependOn" : ["Workflow_A","Workflow_B"]
}
With that results, I’m able to define my global worfklow, in parallel :
A (parallel)
B (parallel)
C (dependOn A & B)
I tried using nx.topological_sort(), but it returns the order for the entire graph.
topological_sort : ['collect_A', 'collect_B', 'job_bronze_A', 'job_bronze_B', 'job_silver_A', 'job_silver_B', 'job_gold_A', 'job_gold_B']
I also tried topological_generations, which group jobs like this :
generations : [['collect_A', 'collect_B'], ['job_bronze_A', 'job_bronze_B'], ['job_silver_A', 'job_silver_B'], ['job_gold_A', 'job_gold_B']]
I’m not satisfied with this. The jobs are grouped by workflows and are executed in sequence. No parallelism is done with this.
I don’t want to dive into custom code that will take time, when I hope that a graph traversal algorithm exists.
Do you have an idea?
Thank you.
You have to define entry points (“collect_A”, “collect_B”), and then check for each job, how many entry points are in their list of ancestors.
import networkx as nx
G = nx.DiGraph()
G.add_edge("collect_A", "job_bronze_A")
G.add_edge("collect_B", "job_bronze_B")
G.add_edge("job_bronze_A", "job_silver_A")
G.add_edge("job_bronze_B", "job_silver_B")
G.add_edge("job_silver_A", "job_gold_A")
G.add_edge("job_silver_B", "job_gold_A")
G.add_edge("job_silver_A", "job_gold_B")
G.add_edge("job_silver_B", "job_gold_B")
A = []
B = []
C = []
for node in nx.topological_sort(G):
ancestors = nx.ancestors(G, node)
if ("collect_A" in ancestors) & ("collect_B" in ancestors):
C.append(node)
elif ("collect_A" in ancestors) | (node == "collect_A"):
A.append(node)
elif ("collect_B" in ancestors) | (node == "collect_B"):
B.append(node)
print(f"A : {A}")
print(f"B : {B}")
print(f"C : {C}")
# A : ['collect_A', 'job_bronze_A', 'job_silver_A']
# B : ['collect_B', 'job_bronze_B', 'job_silver_B']
# C : ['job_gold_A', 'job_gold_B']
If this is a one-off solution, manually defining computation branches (as I have done above) is fine. If this a recurring problem and/or the computation graph is very complex, you may want to look into proper pipeline frameworks like snakemake and ruffus. Though the syntax is different, the basic ideas are very similar:
- You annotate each node in your computation graph (“job”) with its input and output data structures (and virtual environment). This allows it to compute the computation graph.
- Upon execution, all steps are done in parallel that can be done in parallel (within specified resource constraints). I.e. for each job, if the defined input file(s) exists, and the input file(s) are new or have changed since the last execution, then the job is executed. In this way, jobs are only executed if they can, and if they have to be (computations are repeated that don’t have to be repeated).