I am building a pipeline composed of 6 components, and I am currently working on the 4 component.
In my script, I created a Sweep Job to hyper-tune my model, which is an autoencoder. The script performs the following tasks:
- Builds and trains the autoencoder
- Create the Autoencoder (Trained)
- Get the Encoder Layer (For analyses)
- Fit standard scaler (object)
- Log Metrics using mlflow
I am saving the following:
- The autoencoder model
- The encoder layers
- The StandardScaler object (For analyses)
- The ScalerAutoencoderWrapper (StandardScaler fit + keras.model.predict)
mlflow.sklearn_log()
raised a warning that.predict
method is missing (To me, this function was built for End-to-end Pipeline + Algo from Sklearn) therefore I use a custom pyfunc from mlflow
After several job experiments, I realized that the SweepJob component automatically outputs the best child. I cannot use mlflow_model
as an output because I have four different outputs, not just one (I need all four).
I thought of using uri_folder, but I’m unsure how iterate thorugh the uri_folder to get my ScalerAutoencoderWrapper or use mlflow to deploy the model in the next step
My question is:
- How can I output all these folders, especially the ScalerAutoencoderWrapper, and pass it as uri_folder? Additionally
- how can I use MLflow to deploy the ScalerAutoencoderWrapper in the next component while retaining the other files?
Any feedback on how the files are saved is also welcome.
Thanks a lot!
I have attached my code.
.... PARTIAL CODE ...
# Log Model
with mlflow.start_run() as run:
# Build Model
autoencoder, encoder = build_model(
input_dim=input_dim,
hidden_layers=hidden_layers,
encoded_dim=encoded_dim,
l1_regularizer=l1_regularizer,
learning_rate=learning_rate,
return_encoder=return_encoder,
)
# Define Strategy
early_stopping = EarlyStopping(
monitor=MONITOR,
patience=patience,
restore_best_weights=True
)
# Fit & Keep History
autoencoder.fit(
X_scaled,
X_scaled,
epochs=epochs,
batch_size=batch_size,
validation_data=(X_validate_scaled, X_validate_scaled),
callbacks=[early_stopping, MLflowCallback()], # Log the final validation loss
)
# Save Model artifacts
input_raw_example = X_train.iloc[:5]
input_transformed = scalerObj.fit_transform(input_raw_example)
# Artifact Names
scaler_pkl = 'scaler.pkl'
encoder_folder = 'encoder'
autoencoder_folder= 'autoencoder'
autoencoder_wrapper_folder = 'scaler_autoencoder_wrapper'
# Save StandardScaler Object
print("--------------> Save Object Scaler")
with open('scaler.pkl', "wb") as f:
pickle.dump(scalerObj, f)
mlflow.log_artifact('scaler.pkl', run_id=run_id)
# Save encoder layers
print("--------------> Save Encoder")
mlflow.keras.log_model(encoder, 'encoder', input_example=input_transformed)
# Save Autoencoder model Only
print("--------------> Save AutoEncoder")
mlflow.keras.log_model(autoencoder, 'autoencoder', input_example=input_transformed)
# Save StandardScaler + Autoencoder
print("--------------> Save ScalerAutoencoderWrapper")
scaler_autoencoder_wrapper = ScalerAutoencoderWrapper(
scaler=scalerObj,
autoencoder=autoencoder
)
mlflow.pyfunc.log_model(
artifact_path='scaler_autoencoder_wrapper',
python_model=scaler_autoencoder_wrapper,
input_example=input_transformed,
signature=infer_signature(
model_input=input_transformed,
model_output=scaler_autoencoder_wrapper.predict(
context=None,
model_input=input_raw_example
)
),
)
print(f"Training Completed, Model and Scaler saved with id : {run_id}")
My Python code
# Create Nodes for Pipelines
@pipeline(default_compute = 'XXXX',
display_name="ABCDE",
experiment_name = "EFGH",
tags={'objective':'DONTKNOW'})
def pipeline_autoencoder(input_file):
# Step 1: Local Feature Selection
feature_extraction_step = feature_extraction(
input_file = input_file,
)
# Step 2: Local Split Selection
data_split_step = data_split (
input_file = feature_extraction_step.outputs.output_file,
)
# Step 3: Hyperparameter tuning (Sweep Job)
train_model_step = train_tune_model(
x_train=data_split_step.outputs.x_train_path,
y_train=data_split_step.outputs.y_train_path,
x_validate=data_split_step.outputs.x_validate_path,
y_validate=data_split_step.outputs.y_validate_path,
hidden_layers = Choice([str, str]),
encoded_dim=Choice([int]),
l1_regularizer=Choice([float, float),
learning_rate=Choice([float, float]),
batch_size=Choice([int, int]),
epochs=Choice([int, int]),
patience=Choice([int, int]),
)
# OverWrite
sweep_step = train_model_step.sweep(
compute='XXXX',
primary_metric = "METRIC",
goal = "MINIMIZE",
sampling_algorithm="RANDOM",
)
sweep_step.early_termination = BanditPolicy(
evaluation_interval=INT,
slack_factor=FLOAT,
delay_evaluation=INT)
sweep_step.set_limits(max_total_trials=INT, max_concurrent_trials=INT, timeout=INT)
# Step 4
deploybestchild (NOT DONE ALREADY)
)
return {
'model_output': sweep_step.outputs.model_output,
"x_test": data_split_step.outputs.x_test_path,
"y_test": data_split_step.outputs.y_test_path,
}
- I have tried to output the file from sweep job as
uri_folder
and got only a blob storage file encoded - I tried to Iterate using
os.walk(path)
but I got nothing is saying empty - I tried to follow this example Azure Hyper Parameters exmple
pehrps you can tell me why they do this
# train model
model = train_model(params, X_train, X_test, y_train, y_test)
# Output the model and test data
# write to local folder first, then copy to output folder
mlflow.sklearn.save_model(model, "model")
from distutils.dir_util import copy_tree
# copy subdirectory example
from_directory = "model"
to_directory = args.model_output
copy_tree(from_directory, to_directory)
Marvin is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.