Consider sklearn pipeline which contains only the following steps: Pipeline
, FeatureUnion
, Transformer
.
I can express an arbitrary sklearn pipeline in a form of DAG, where nodes correspond to data state and edges correspond to pipeline steps. From sklearn example (assume ColumnSelector
is implemented) the following pipeline:
column_trans = make_column_transformer(
(OneHotEncoder(), ['city']),
(CountVectorizer(), 'title'),
remainder=MinMaxScaler())
can be rewritten as:
FeatureUnion([
("cat_ohe", Pipeline([
("selector", ColumnSelector(["city"])),
("ohe", OneHotEncoder())
])),
("cat_cv", Pipeline([
("selector", ColumnSelector("titile")),
("cv", CountVectorizer())
])),
("rem", Pipeline([
("selector", ColumnSelector(exclude=["city", "title"])),
("cv", MinMaxScaler())
])),
])
and embedded into sql table:
edge_id,source_node_id,target_node_id,transformer
0,0,1,'ColumnSelector'
1,1,2,'OneHotEncoder'
2,2,3,'Concat'
3,0,4,'ColumnSelector'
4,4,5,'CountVectorizer'
5,5,3,'Concat'
6,0,6,'ColumnSelector'
7,6,7,'MinMaxScaler'
8,7,3,'Concat'
I want to implement serialization and deserialization of the pipeline. Below is my attempt on that, please provide comments on how to improve deserialization and how to implement serialization.
from sklearn.pipeline import Pipeline, FeatureUnion, make_pipeline, make_union
from sklearn.impute import SimpleImputer
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.preprocessing import StandardScaler, Normalizer
edges = [
(1, 1, 2, SimpleImputer()),
(2, 2, 3, TruncatedSVD()),
(3, 2, 4, PCA()),
(4, 3, 5, StandardScaler()),
(5, 4, 6, None),
(6, 5, 6, None),
(7, 6, 7, Normalizer())
]
def parse_pipeline(node):
steps = []
while True:
current_edges = [e for e in edges if e[1] == node]
if len(current_edges) == 0: # terminal node
if len(steps) == 0:
return None
elif len(steps) == 1:
return steps[0]
else:
return make_pipeline(steps)
elif len(current_edges) == 1: # sequential node
node = current_edges[0][2]
if current_edges[0][3] is None: # concat node
node = current_edges[0][2]
break
else:
steps.append(current_edges[0][3])
else: # parallel node
parallel_steps = []
terminal_nodes = []
for e in current_edges:
p, n = parse_pipeline(e[2])
if p is None:
parallel_steps.append(e[3])
else:
parallel_steps.append(make_pipeline([e[3], p]))
terminal_nodes.append(n)
# check terminal nodes equal
assert len(set(terminal_nodes)) == 1
if len(parallel_steps) == 1:
s = parallel_steps[0]
else:
s = make_union(parallel_steps)
steps.append(s)
node = terminal_nodes[0]
if len(steps) == 0:
return None, node
elif len(steps) == 1:
return steps[0], node
else:
return make_pipeline(steps), node
p = parse_pipeline(1)
print(p)