Build Good Habits - Your First ML Pipeline
Machine Learning Engineering is, at its core, an extension of Software Engineering principles and best practices. Working in a Jupyter notebook is completely different than building a production-ready ML system. Here, clean code, good abstractions and modular design are key. In fact, a good ML project hides the complexity of the model and focuses on the data and the pipeline - you might not even know what model is being used! This is the power of a good ML pipeline.
Tutorial Goals
In this tutorial, you will:
- Setup a project with uv, DVC and MLflow
- Build a pipeline that creates a versioned dataset
- Train a model (with xgboost) and log metrics in MLflow
- Evaluate your model and experiment with hyperparameters
Now let’s look at what makes a good ML project a bit more special.
If you’re starting your own ML project, I’ve created a template that might be a good starting point: https://github.com/curiousily/ml-project-template (fork it and make it your own). Feel free to cut out parts you don’t need, too!
Reproducibility
Imagine you’re taking a walk in the park. Suddenly, you think of changing a few hyperparameters in your model. You go home, make the changes, and run the model. It performs better, and you’re happy. But next month, after some code cleanup, you try to reproduce the same steps and don’t get the same result. What went wrong? It’s unclear, but you didn’t start your project with reproducibility in mind. There’s a better way.
To ensure your project is reproducible, follow these steps:
- Set all seeds (numpy, pytorch, random)
- Use the same dataset (seed database, limit date period for data, data versioning)
- Track hyperparameters and metrics
- Maintain a reproducible environment (library versions and OS)
To help with the reproducibility I use DVC1 (Data Version Control) for data versioning and reruning parts of the pipeline. DVC is an open-source version control system for machine learning projects.
Let’s clone our project (https://github.com/mlexpertio/banked/) and start it up. Follow the instructions in the README to set up the environment.
Build Dataset
We’ve build our intuition and explored the data in the previous tutorial. Let’s create a function that builds the dataset:
from pathlib import Path
from typing import Tuple
import joblib
import pandas as pd
from loguru import logger
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer, OneHotEncoder
from banked.config import Config
def build_dataset(save_dir: Path):
logger.debug(f"Building dataset at {save_dir}")
save_dir.mkdir(parents=True, exist_ok=True)
df = pd.read_csv(Config.Path.RAW_DATA_DIR / "bank.csv")
df = _cleanup_dataset(df)
df_train, df_test = _split_dataset(df, Config.Dataset.TEST_SIZE)
processor = _create_preprocessor(df)
df_train = _process_dataset(df_train, processor, fit_processor=True)
df_train.to_parquet(save_dir / Config.Dataset.TRAIN_FILE)
df_test = _process_dataset(df_test, processor, fit_processor=False)
df_test.to_parquet(save_dir / Config.Dataset.TEST_FILE)
joblib.dump(processor, save_dir / Config.Model.PROCESSOR_FILE_NAME)
The code above does the following:
- Reads the raw data
- Cleans it up (using
_cleanup_dataset()
) - Splits it into train and test sets
- Processes the data (using
_process_dataset()
) - Saves the processed (train and test) data and the preprocessor
The data processing is done in the _process_dataset()
function:
def _process_dataset(df: pd.DataFrame, processor: Pipeline, fit_processor: bool) -> pd.DataFrame:
X, y = df.drop(columns=["deposit"]), df["deposit"]
X_processed = processor.fit_transform(X) if fit_processor else processor.transform(X)
column_names = processor.named_steps["preprocessor"].get_feature_names_out()
df = pd.DataFrame(X_processed, columns=column_names)
df["deposit"] = y.values
return df
Here’s what the cleanup does:
NEW_COLUMNS = [
"age",
"job",
"marital_status",
"education",
"has_credit_default",
"balance",
"has_housing_loan",
"has_personal_loan",
"contact_type",
"contact_day_of_week",
"contact_month",
"n_campaign_contact",
"days_since_contact",
"n_previous_contact",
"prev_campaign_outcome",
"deposit",
]
def _cleanup_dataset(df: pd.DataFrame) -> pd.DataFrame:
df = df.drop_duplicates()
df = df.drop(columns=["duration"])
df["job"] = df.job.str.replace(".", "")
df["deposit"] = df.deposit.map({"yes": 1, "no": 0})
df.columns = NEW_COLUMNS
return df
This is very similar to what we did in our notebook. Splitting is easy too:
def _split_dataset(df: pd.DataFrame, test_size: float) -> Tuple[pd.DataFrame, pd.DataFrame]:
df_train = df.sample(frac=1.0 - test_size)
df_test = df.drop(df_train.index)
return df_train, df_test
Note that we’re saving the data into a parquet file. This is a columnar storage format that is very efficient for reading and writing data. It’s also a good choice for versioning data.
Feature Engineering
The processing of the data will be handled by a Pipeline
2 from scikit-learn that will include feature engineering and preprocessing:
CATEGORICAL_FEATURES = [
"job",
"marital_status",
"education",
"has_credit_default",
"has_housing_loan",
"has_personal_loan",
"contact_type",
"contact_month",
"prev_campaign_outcome",
]
def _create_preprocessor(df: pd.DataFrame) -> Pipeline:
feature_engineering_transformer = FunctionTransformer(_add_features)
known_categories = {feature: df[feature].unique() for feature in CATEGORICAL_FEATURES}
known_categories["contact_season"] = list(set(MONTH_TO_SEASON.values()))
categorical_features = list(known_categories.keys())
categorical_transformer = OneHotEncoder(
categories=[known_categories[feature] for feature in categorical_features],
sparse_output=False,
handle_unknown="error", # Error when unknown category is found
)
preprocessor = ColumnTransformer(
transformers=[("cat", categorical_transformer, categorical_features)],
remainder="passthrough", # Keep numerical features as is
verbose_feature_names_out=False,
)
return Pipeline(
steps=[
("feature_engineering", feature_engineering_transformer),
("preprocessor", preprocessor),
]
)
The Pipeline
allows us to chain together multiple transformers and get a single transformer that can be used to preprocess the data. We’re handling:
- Categorical features (one-hot encoding)
- Adding a new feature
contact_season
based on thecontact_month
Here’s the code for adding the new feature:
MONTH_TO_SEASON = {
"jan": "winter",
"feb": "winter",
"mar": "spring",
"apr": "spring",
"may": "spring",
"jun": "summer",
"jul": "summer",
"aug": "summer",
"sep": "fall",
"oct": "fall",
"nov": "fall",
"dec": "winter",
}
def _add_features(df):
df = df.copy()
df["contact_season"] = df["contact_month"].map(MONTH_TO_SEASON)
return df
Let’s add a script that will glue everything together:
#!/usr/bin/env python
from banked.config import Config, configure_logging, seed_everything
from banked.dataset_builder import build_dataset
seed_everything()
configure_logging()
build_dataset(Config.Path.DATA_DIR)
And a stage in our DVC pipeline:
stages:
build-dataset:
cmd: bin/build-dataset
deps:
- bin/build-dataset
outs:
- artefacts/data
Now, let’s run the pipeline:
uv run dvc repro build-dataset
This will run the build-dataset
stage and save the processed data (including the preprocessor) in the artefacts/data
directory.
Experiment Tracking
Training a good model comes from having a structured approach to doing experiments. Usually, this means you want to track the following:
- Hyperparameters
- Model performance
- Train metrics
- Test metrics
I use MLflow3 to do the heavy lifting for me. Think of it as your ML project’s journal, automatically documenting every experiment you run. Open source and free to use. Also, it comes with a nice UI. Start it by running:
uv run mlflow server --host 127.0.0.1 --port 8080
Go to http://127.0.0.1:8080/ to see the UI. Of course, you can also host it on a cloud provider like AWS, Azure, or GCP. But I prefer to keep it local.
Of course, MLflow comes with a nice Python client. Let’s see how we can use it.
Training and Evaluation
First, let’s setup the training script and connect it to MLflow:
from pathlib import Path
import joblib
import mlflow
import numpy as np
import pandas as pd
from loguru import logger
from sklearn.metrics import (
accuracy_score,
confusion_matrix,
f1_score,
precision_score,
recall_score,
roc_auc_score,
)
from xgboost import XGBClassifier
from banked.config import Config
mlflow.set_tracking_uri(uri=Config.ML_FLOW_URL)
mlflow.set_experiment("banking-deposit")
Here’s the training code:
PARAMS = {
"eta": 0.1,
"max_depth": 6,
"min_child_weight": 4,
"n_estimators": 100,
"random_state": Config.SEED,
}
def train(data_dir: Path, save_dir: Path):
logger.debug("Model training started...")
Config.Path.MODELS_DIR.mkdir(parents=True, exist_ok=True)
train_df = pd.read_parquet(data_dir / Config.Dataset.TRAIN_FILE)
X_train, y_train = train_df.drop(columns=["deposit"]), train_df["deposit"]
model = XGBClassifier(objective="binary:logistic", eval_metric="auc", **PARAMS)
with mlflow.start_run():
mlflow.log_params(PARAMS)
mlflow.set_tag("model", "XGBClassifier")
model.fit(X_train, y_train)
mlflow.log_metric("train_accuracy", accuracy_score(y_train, model.predict(X_train)))
mlflow.log_metric("train_auc", model.score(X_train, y_train))
_evaluate_test(pd.read_parquet(data_dir / Config.Dataset.TEST_FILE), model)
joblib.dump(model, save_dir / Config.Model.FILE_NAME)
logger.debug(f"Model saved at {save_dir / Config.Model.FILE_NAME}")
We’re loading the training data and creating an XGBClassifier
model.
Then we start the experiment with mlflow.start_run()
and log the hyperparameters and the model type.
After training, we evaluate the model on the test set and log the metrics:
def _evaluate_test(test_df: pd.DataFrame, model: XGBClassifier):
X_test, y_test = test_df.drop(columns=["deposit"]), test_df["deposit"]
y_test_proba = model.predict_proba(X_test)
y_pred = np.argmax(y_test_proba, axis=1)
y_test_scores = y_test_proba[:, 1]
tn, fp, fn, tp = confusion_matrix(y_test, y_pred, normalize="true").ravel()
mlflow.log_metrics(
{
"test_tp": tp,
"test_fn": fn,
"test_fp": fp,
"test_tn": tn,
"test_accuracy": accuracy_score(y_test, y_pred),
"test_precision": precision_score(y_test, y_pred, pos_label=1, average="binary"),
"test_recall": recall_score(y_test, y_pred, pos_label=1, average="binary"),
"test_f1_score": f1_score(y_test, y_pred, pos_label=1, average="binary"),
"test_roc_auc": roc_auc_score(y_test, y_test_scores),
}
)
Finally, we save the model and log the path to it. Here’s the script that will run the training:
#!/usr/bin/env python
from banked.config import Config, configure_logging, seed_everything
from banked.trainer import train
seed_everything()
configure_logging()
train(Config.Path.DATA_DIR, Config.Path.MODELS_DIR)
And the DVC stage:
train-model:
cmd: bin/train-model
deps:
- artefacts/data
- bin/train-model
outs:
- artefacts/models
We’re ready to run the training pipeline:
uv run dvc repro train-model
This will train the model and log the metrics in MLflow. Check the MLflow UI to see the results:
Let’s try to run a few experiments and see how they look in MLflow. Change the max_depth
parameter to 4 (and 8 after that) and rerun the experiment:
uv run dvc repro train-model -fs
This will force the pipeline to rerun only the train-model
stage. You can see the results in the MLflow UI:
