Context : ETL with bunch of tasks and dependencies, to be optimized source oriented.
I have a directed graph (DiGraph) defined using python pkg NetworkX, representing dependencies between various nodes (tables).
I plan to use that dag to generate optimize workflows, depending on data sources availability.
Let’s say I have two data collection as input :
- One is ready at 5AM (collect_A)
- second one at 6AM. (collect_B)
Currently, I’m waiting 6AM to trigger everything, but business need data early.
That’s why I’m looking for a way to travel that graph, to generate optimize workflow with max parallelism source oriented (between workflows, and inside workflows).
Quick code to generate graph with simple but parallelable workflow:
import networkx as nx
G = nx.DiGraph()
G.add_edge("collect_A", "job_bronze_A")
G.add_edge("collect_B", "job_bronze_B")
G.add_edge("job_bronze_A", "job_silver_A")
G.add_edge("job_bronze_B", "job_silver_B")
G.add_edge("job_silver_A", "job_gold_A")
G.add_edge("job_silver_B", "job_gold_A")
G.add_edge("job_silver_A", "job_gold_B")
G.add_edge("job_silver_B", "job_gold_B")
topological_sort = list(nx.topological_sort(G))
print("topological_sort : ",topological_sort)
generations = [(generation) for generation in nx.topological_generations(G)]
print("generations : ",generations)
With that simple DAG, my goal is to split into :
Workflow_A : {
"tasks" : [{
"task":"collect_A",
"dependOn":[]
},
{
"task":"job_bronze_A",
"dependOn":["collect_A"]
},
{
"task":"job_silver_A",
"dependOn":["job_bronze_A"]
}],
"dependOn" : []
}
Workflow_B : {
"tasks" : [{
"task":"collect_B",
"dependOn":[]
},
{
"task":"job_bronze_B",
"dependOn":["collect_B"]
},
{
"task":"job_silver_C",
"dependOn":["job_bronze_C"]
}],
"dependOn" : []
}
Workflow_C : {
"tasks" : [{
"task":"job_gold_A",
"dependOn":[]
},
{
"task":"job_gold_B",
"dependOn":[]
}],
"dependOn" : ["Workflow_A","Workflow_B"]
}
With that results, I’m able to define my global worfklow, in parallel :
A (parallel)
B (parallel)
C (dependOn A & B)
I tried using nx.topological_sort(), but it returns the order for the entire graph.
topological_sort : ['collect_A', 'collect_B', 'job_bronze_A', 'job_bronze_B', 'job_silver_A', 'job_silver_B', 'job_gold_A', 'job_gold_B']
I also tried topological_generations, which group jobs like this :
generations : [['collect_A', 'collect_B'], ['job_bronze_A', 'job_bronze_B'], ['job_silver_A', 'job_silver_B'], ['job_gold_A', 'job_gold_B']]
I’m not satisfied with this. The jobs are grouped by workflows and are executed in sequence. No parallelism is done with this.
I don’t want to dive into custom code that will take time, when I hope that a graph traversal algorithm exists.
Do you have an idea?
Thank you.