In this tutorial, we build a complete, production-grade ML experimentation and deployment workflow using MLflow. We start by launching a dedicated MLflow Tracking Server with a structured backend and artifact store, enabling us to track experiments in a scalable, reproducible manner. We then train multiple machine learning models using a nested hyperparameter sweep while automatically logging parameters, metrics, and model artifacts. We enhance the experiment by logging diagnostic visualizations, evaluating the best model using MLflow’s built-in evaluation framework, and storing detailed evaluation results for future analysis. We also deploy the trained model using MLflow’s native serving capabilities and interact with it via a REST API, demonstrating how MLflow bridges the gap between experimentation and real-world model deployment.
!pip -q install "mlflow>=3.0.0" scikit-learn pandas numpy matplotlib requests import os import time import json import shutil import socket import signal import subprocess from pathlib import Path import numpy as np import pandas as pd import matplotlib.pyplot as plt import requests from sklearn.datasets import load_breast_cancer from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LogisticRegression from sklearn.metrics import ( roc_auc_score, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, ConfusionMatrixDisplay, ) import mlflow import mlflow.sklearn from mlflow.models.signature import infer_signature def _is_port_open(host: str, port: int, timeout_s: float = 0.2) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(timeout_s) return s.connect_ex((host, port)) == 0 def _wait_for_http(url: str, timeout_s: int = 30) -> None: t0 = time.time() last_err = None while time.time() - t0 < timeout_s: try: r = requests.get(url, timeout=1) if r.status_code < 500: return except Exception as e: last_err = e time.sleep(0.5) raise RuntimeError(f"Server not ready at {url}. Last error: {last_err}") def _safe_kill(proc: subprocess.Popen): if proc is None: return try: proc.terminate() try: proc.wait(timeout=5) except subprocess.TimeoutExpired: proc.kill() except Exception: pass
We install all required dependencies and import the complete MLflow, scikit-learn, and system libraries needed for experiment tracking and deployment. We define utility functions that allow us to check port availability, wait for server readiness, and safely terminate background processes. We establish the foundational infrastructure to ensure our MLflow tracking server and model-serving components operate reliably in the Colab environment.
BASE_DIR = Path("https://www.marktechpost.com/content/mlflow_colab_demo").resolve() BACKEND_DB = BASE_DIR / "mlflow.db" ARTIFACT_ROOT = BASE_DIR / "mlartifacts" os.makedirs(BASE_DIR, exist_ok=True) os.makedirs(ARTIFACT_ROOT, exist_ok=True) HOST = "127.0.0.1" PORT = 5000 TRACKING_URI = f"http://{HOST}:{PORT}" if _is_port_open(HOST, PORT): for p in range(5001, 5015): if not _is_port_open(HOST, p): PORT = p TRACKING_URI = f"http://{HOST}:{PORT}" break print("Using TRACKING_URI:", TRACKING_URI) print("Backend DB:", BACKEND_DB) print("Artifact root:", ARTIFACT_ROOT) server_cmd = [ "mlflow", "server", "--host", HOST, "--port", str(PORT), "--backend-store-uri", f"sqlite:///{BACKEND_DB}", "--default-artifact-root", str(ARTIFACT_ROOT), ] mlflow_server = subprocess.Popen( server_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) _wait_for_http(TRACKING_URI, timeout_s=45) mlflow.set_tracking_uri(TRACKING_URI) print("MLflow server is up.") EXPERIMENT_NAME = "colab-advanced-mlflow-sklearn" mlflow.set_experiment(EXPERIMENT_NAME)
We configure the MLflow backend storage and artifact directories to create a structured, persistent experiment-tracking environment. We launch the MLflow Tracking Server with a SQLite database and a local artifact store, enabling full experiment logging and management. We connect our notebook to the running MLflow server and initialize a dedicated experiment that will organize all training runs and associated metadata.
data = load_breast_cancer(as_frame=True) df = data.frame.copy() target_col = "target" X = df.drop(columns=[target_col]) y = df[target_col].astype(int) mlflow.sklearn.autolog( log_input_examples=False, log_model_signatures=False, silent=True ) C_VALUES = [0.01, 0.1, 1.0, 3.0] SOLVERS = ["liblinear", "lbfgs"] best = {"auc": -1.0, "run_id": None, "params": None}
We load the dataset and prepare the training and testing splits required for machine learning experimentation. We enable MLflow autologging, allowing automatic tracking of parameters, metrics, and model artifacts without manual intervention. We define the hyperparameter search space and initialize the structure to identify and store the best-performing model configuration.
with mlflow.start_run(run_name="parent_sweep_run") as parent_run: mlflow.log_param("dataset", "sklearn_breast_cancer") mlflow.log_param("n_features", X_train.shape[1]) mlflow.log_param("n_train", X_train.shape[0]) mlflow.log_param("n_test", X_test.shape[0]) for C in C_VALUES: for solver in SOLVERS: with mlflow.start_run(run_name=f"child_C={C}_solver={solver}", nested=True) as child_run: pipe = Pipeline([ ("scaler", StandardScaler()), ("clf", LogisticRegression( C=C, solver=solver, penalty="l2", max_iter=2000, random_state=42 )) ]) pipe.fit(X_train, y_train) proba = pipe.predict_proba(X_test)[:, 1] pred = (proba >= 0.5).astype(int) auc = roc_auc_score(y_test, proba) acc = accuracy_score(y_test, pred) prec = precision_score(y_test, pred, zero_division=0) rec = recall_score(y_test, pred, zero_division=0) f1 = f1_score(y_test, pred, zero_division=0) mlflow.log_metrics({ "test_auc": float(auc), "test_accuracy": float(acc), "test_precision": float(prec), "test_recall": float(rec), "test_f1": float(f1), }) cm = confusion_matrix(y_test, pred) disp = ConfusionMatrixDisplay(cm, display_labels=data.target_names) fig, ax = plt.subplots(figsize=(5, 4)) disp.plot(ax=ax, values_format="d") ax.set_title(f"Confusion Matrix (C={C}, solver={solver})") cm_path = BASE_DIR / "confusion_matrix.png" fig.tight_layout() fig.savefig(cm_path, dpi=140) plt.close(fig) mlflow.log_artifact(str(cm_path), artifact_path="diagnostics") if auc > best["auc"]: best.update({ "auc": float(auc), "run_id": child_run.info.run_id, "params": {"C": C, "solver": solver} }) mlflow.log_dict(best, "best_run_summary.json") print("Best config:", best)
We perform a nested hyperparameter sweep, training multiple models within a structured parent-child run hierarchy. We compute performance metrics and log them alongside diagnostic artifacts, such as confusion matrices, to enable detailed analysis of experiments. We continuously monitor model performance and update our tracking structure to identify the best configuration across all training runs.
best_C = best["params"]["C"] best_solver = best["params"]["solver"] final_pipe = Pipeline([ ("scaler", StandardScaler()), ("clf", LogisticRegression( C=best_C, solver=best_solver, penalty="l2", max_iter=2000, random_state=42 )) ]) with mlflow.start_run(run_name="final_model_run") as final_run: final_pipe.fit(X_train, y_train) proba = final_pipe.predict_proba(X_test)[:, 1] pred = (proba >= 0.5).astype(int) metrics = { "test_auc": float(roc_auc_score(y_test, proba)), "test_accuracy": float(accuracy_score(y_test, pred)), "test_precision": float(precision_score(y_test, pred, zero_division=0)), "test_recall": float(recall_score(y_test, pred, zero_division=0)), "test_f1": float(f1_score(y_test, pred, zero_division=0)), } mlflow.log_metrics(metrics) mlflow.log_params({"C": best_C, "solver": best_solver, "model": "LogisticRegression+StandardScaler"}) input_example = X_test.iloc[:5].copy() signature = infer_signature(input_example, final_pipe.predict_proba(input_example)[:, 1]) model_info = mlflow.sklearn.log_model( sk_model=final_pipe, artifact_path="model", signature=signature, input_example=input_example, registered_model_name=None, ) print("Final run_id:", final_run.info.run_id) print("Logged model URI:", model_info.model_uri) eval_df = X_test.copy() eval_df["label"] = y_test.values eval_result = mlflow.models.evaluate( model=model_info.model_uri, data=eval_df, targets="label", model_type="classifier", evaluators="default", ) eval_summary = { "metrics": {k: float(v) if isinstance(v, (int, float, np.floating)) else str(v) for k, v in eval_result.metrics.items()}, "artifacts": {k: str(v) for k, v in eval_result.artifacts.items()}, } mlflow.log_dict(eval_summary, "evaluation/eval_summary.json")
We train the final model using the best hyperparameters identified during the experiment sweep and log it with a proper signature and input example. We evaluate the model using MLflow’s built-in evaluation framework, which generates detailed metrics and evaluation artifacts. We store the evaluation summary within MLflow, ensuring the final model is fully documented, reproducible, and ready for deployment.
SERVE_PORT = 6000 if _is_port_open(HOST, SERVE_PORT): for p in range(6001, 6020): if not _is_port_open(HOST, p): SERVE_PORT = p break MODEL_URI = model_info.model_uri serve_cmd = [ "mlflow", "models", "serve", "-m", MODEL_URI, "-p", str(SERVE_PORT), "--host", HOST, "--env-manager", "local" ] mlflow_serve = subprocess.Popen( serve_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) serve_url = f"http://{HOST}:{SERVE_PORT}/invocations" _wait_for_http(f"http://{HOST}:{SERVE_PORT}", timeout_s=60) print("Model server is up at:", serve_url) payload = { "dataframe_split": { "columns": list(X_test.columns), "data": X_test.iloc[:3].values.tolist() } } r = requests.post( serve_url, headers={"Content-Type": "application/json"}, data=json.dumps(payload), timeout=10 ) print("Serve status:", r.status_code) print("Predictions (probabilities or outputs):", r.text) print("nOpen the MLflow UI by visiting:", TRACKING_URI) print("Artifacts are stored under:", ARTIFACT_ROOT)
We deploy the trained MLflow model as a live REST API service using MLflow’s native serving infrastructure. We send a test request to the deployed model endpoint to verify that the model responds correctly and produces predictions in real time. We complete the full machine learning lifecycle by transitioning from experiment tracking to live model deployment within a unified MLflow workflow.
In conclusion, we established a fully integrated ML lifecycle pipeline using MLflow, covering experiment tracking, hyperparameter optimization, artifact logging, model evaluation, and live model serving. We created a structured environment in which every training run is tracked, reproducible, and auditable, enabling efficient experimentation and model comparison. We leveraged MLflow’s model packaging and serving infrastructure to transform trained models into deployable services with minimal effort. By completing this workflow, we demonstrated how MLflow functions as a central orchestration layer for managing machine learning systems, enabling scalable, reproducible, and production-ready ML pipelines entirely within a cloud-based notebook environment.
Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Michal Sutter
Michal Sutter is a data science professional with a Master of Science in Data Science from the University of Padova. With a solid foundation in statistical analysis, machine learning, and data engineering, Michal excels at transforming complex datasets into actionable insights.


