Shapash tutorial: batch scoring parquet (industrial)

This notebook shows an industrial batch workflow with Shapash: - train a Titanic binary classifier - build a SmartPredictor - score a large parquet dataset by chunks - write results incrementally to parquet - monitor runtime and memory behavior

1. Imports

[1]:
import os
import gc
import time
import tracemalloc
from pathlib import Path

import numpy as np
import pandas as pd
from category_encoders import one_hot
from lightgbm import LGBMClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split

import pyarrow as pa
import pyarrow.parquet as pq

from shapash import SmartExplainer
from shapash.data.data_loader import data_loading

2. Load and prepare Titanic data

[2]:
titanic_df, titanic_dict = data_loading("titanic")

titanic_df["Pclass"] = titanic_df["Pclass"].map(
    {"First class": 1, "Second class": 2, "Third class": 3}
)

features = ["Pclass", "Age", "Sex", "SibSp", "Parch"]
target = "Survived"

df = titanic_df[features + [target]].copy()
df["Age"] = df["Age"].fillna(df["Age"].median())
df["Pclass"] = df["Pclass"].fillna(df["Pclass"].median())

X = df[features]
y = df[target].astype(int).to_frame()

encoder = one_hot.OneHotEncoder(cols=["Sex"])
X_enc = encoder.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X_enc, y, test_size=0.2, random_state=79, stratify=y
)

X_train.shape, X_test.shape
[2]:
((712, 6), (179, 6))

3. Train model and build SmartPredictor

[ ]:
model = LGBMClassifier(max_depth=3, n_estimators=120, random_state=79, verbose=-1)
model.fit(X_train, y_train.iloc[:, 0])

y_pred_test = model.predict(X_test)
metrics_df = pd.Series(
    {
        "accuracy": round(accuracy_score(y_test.iloc[:, 0], y_pred_test), 4),
        "precision": round(precision_score(y_test.iloc[:, 0], y_pred_test), 4),
        "recall": round(recall_score(y_test.iloc[:, 0], y_pred_test), 4),
        "f1": round(f1_score(y_test.iloc[:, 0], y_pred_test), 4),
    }, name="test_metrics"
)
metrics_df
accuracy     0.8045
precision    0.7656
recall       0.7101
f1           0.7368
Name: test_metrics, dtype: float64
[4]:
response_dict = {0: "Deceased", 1: "Survived"}

xpl = SmartExplainer(
    model=model,
    preprocessing=encoder,
    features_dict=titanic_dict,
    label_dict=response_dict,
    title_story="Titanic batch scoring with parquet"
)

# Compile on a representative sample to initialize explainability artifacts.
xpl.compile(
    x=X_test,
    y_pred=pd.DataFrame(y_pred_test, index=X_test.index, columns=[target]),
    y_target=y_test
)

predictor = xpl.to_smartpredictor()
predictor.modify_mask(max_contrib=3)
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>

4. Create a larger parquet input dataset

To simulate an industrial batch, we replicate Titanic rows and persist them as parquet.

[5]:
DATA_DIR = Path("./tutorial_artifacts/batch_scoring")
DATA_DIR.mkdir(parents=True, exist_ok=True)

input_parquet_path = DATA_DIR / "titanic_batch_input.parquet"
output_parquet_path = DATA_DIR / "titanic_batch_scored.parquet"

n_repeats = 150
batch_input_df = pd.concat([X] * n_repeats, axis=0, ignore_index=True)
batch_input_df["row_id"] = np.arange(len(batch_input_df))

batch_input_df.to_parquet(input_parquet_path, index=False)

print(f"Input parquet: {input_parquet_path}")
print(f"Rows: {len(batch_input_df):,}")
batch_input_df.head(3)
Input parquet: tutorial_artifacts/batch_scoring/titanic_batch_input.parquet
Rows: 133,650
[5]:
Pclass Age Sex SibSp Parch row_id
0 3 22.0 male 1 0 0
1 1 38.0 female 1 0 1
2 3 26.0 female 0 0 2

5. Batch scoring by parquet chunks

Key industrial choices: - stream parquet by batches (no full dataframe load) - score and explain each chunk - append scored results incrementally to output parquet - collect chunk-level runtime and memory stats

[6]:
def to_mb(n_bytes):
    return n_bytes / (1024 ** 2)


def batch_score_parquet(
    predictor_obj,
    input_path,
    output_path,
    feature_cols,
    id_col="row_id",
    chunk_size=5000,
):
    if os.path.exists(output_path):
        os.remove(output_path)

    parquet_file = pq.ParquetFile(input_path)
    writer = None
    chunk_logs = []
    total_rows = 0

    tracemalloc.start()
    t0_global = time.perf_counter()

    try:
        for chunk_id, record_batch in enumerate(parquet_file.iter_batches(batch_size=chunk_size), start=1):
            t0_chunk = time.perf_counter()
            current_bytes, peak_bytes_before = tracemalloc.get_traced_memory()

            chunk_df = record_batch.to_pandas()
            x_chunk = chunk_df[feature_cols]

            predictor_obj.add_input(x=x_chunk)
            summary_df = predictor_obj.summarize()

            scored_chunk = pd.concat(
                [chunk_df[[id_col]].reset_index(drop=True), summary_df.reset_index(drop=True)],
                axis=1,
            )
            scored_chunk["chunk_id"] = chunk_id

            # Parquet cannot serialize mixed python object columns reliably (e.g. int/str in value_i).
            object_cols = scored_chunk.select_dtypes(include=["object"]).columns
            if len(object_cols) > 0:
                scored_chunk[object_cols] = scored_chunk[object_cols].astype("string")

            table = pa.Table.from_pandas(scored_chunk, preserve_index=False)
            if writer is None:
                writer = pq.ParquetWriter(output_path, table.schema)
            writer.write_table(table)

            elapsed_ms = (time.perf_counter() - t0_chunk) * 1000
            current_after, peak_after = tracemalloc.get_traced_memory()

            chunk_logs.append(
                {
                    "chunk_id": chunk_id,
                    "rows": len(chunk_df),
                    "latency_ms": round(elapsed_ms, 2),
                    "mem_current_mb": round(to_mb(current_after), 2),
                    "mem_peak_mb": round(to_mb(max(peak_bytes_before, peak_after)), 2),
                }
            )

            total_rows += len(chunk_df)
            del chunk_df, x_chunk, summary_df, scored_chunk, table
            gc.collect()

    finally:
        if writer is not None:
            writer.close()
        total_s = time.perf_counter() - t0_global
        _, final_peak = tracemalloc.get_traced_memory()
        tracemalloc.stop()

    perf_df = pd.DataFrame(chunk_logs)
    global_metrics = {
        "total_rows": total_rows,
        "chunks": len(perf_df),
        "total_runtime_s": round(total_s, 3),
        "rows_per_second": round(total_rows / max(total_s, 1e-9), 2),
        "global_peak_mb": round(to_mb(final_peak), 2),
        "output_path": str(output_path),
    }
    return perf_df, global_metrics

[7]:
perf_df, global_metrics = batch_score_parquet(
    predictor_obj=predictor,
    input_path=str(input_parquet_path),
    output_path=str(output_parquet_path),
    feature_cols=features,
    id_col="row_id",
    chunk_size=4000,
)

print(global_metrics)
perf_df.head(10)
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
{'total_rows': 133650, 'chunks': 34, 'total_runtime_s': 11.916, 'rows_per_second': 11216.36, 'global_peak_mb': 5.17, 'output_path': 'tutorial_artifacts/batch_scoring/titanic_batch_scored.parquet'}
[7]:
chunk_id rows latency_ms mem_current_mb mem_peak_mb
0 1 4000 388.51 4.20 4.82
1 2 4000 292.95 4.32 5.11
2 3 4000 297.66 4.32 5.12
3 4 4000 296.40 4.32 5.12
4 5 4000 315.24 4.33 5.12
5 6 4000 306.67 4.33 5.13
6 7 4000 311.46 4.33 5.13
7 8 4000 306.44 4.34 5.13
8 9 4000 300.08 4.34 5.13
9 10 4000 295.54 4.34 5.13

6. Read scored parquet and inspect outputs

[8]:
scored_df = pd.read_parquet(output_parquet_path)

print(f"Scored rows: {len(scored_df):,}")
print(f"Columns: {scored_df.columns.tolist()}")

scored_df.head(5)
Scored rows: 133,650
Columns: ['row_id', 'ypred', 'proba', 'feature_1', 'value_1', 'contribution_1', 'feature_2', 'value_2', 'contribution_2', 'feature_3', 'value_3', 'contribution_3', 'chunk_id']
[8]:
row_id ypred proba feature_1 value_1 contribution_1 feature_2 value_2 contribution_2 feature_3 value_3 contribution_3 chunk_id
0 0 Deceased 0.919866 Sex male 1.0349050232018024 Ticket class 3 0.6038758628502086 Age 22.0 0.3174027447109047 1
1 1 Survived 0.963956 Sex female 2.369341374897147 Ticket class 1 1.9256231382751856 Age 38.0 -0.6158891469343937 1
2 2 Survived 0.723818 Sex female 1.826965866857368 Ticket class 3 -1.055174618975645 Age 26.0 0.3998616351305038 1
3 3 Survived 0.990708 Sex female 2.4174548644071776 Ticket class 1 2.0805592032295315 Age 35.0 0.5622844591058956 1
4 4 Deceased 0.860486 Sex male 1.023258463786566 Ticket class 3 0.6412293719558009 Age 35.0 -0.1650029126172407 1
[9]:
perf_summary = pd.Series(
    {
        "rows_total": int(perf_df["rows"].sum()),
        "latency_ms_mean": float(perf_df["latency_ms"].mean()),
        "latency_ms_p95": float(perf_df["latency_ms"].quantile(0.95)),
        "peak_mb_max": float(perf_df["mem_peak_mb"].max()),
    }
)
perf_summary
[9]:
rows_total         133650.000000
latency_ms_mean       306.115588
latency_ms_p95        356.844000
peak_mb_max             5.170000
dtype: float64

7. Optional: compare with monolithic scoring

For medium datasets only, this baseline helps quantify why chunking is safer on memory.

[10]:
def score_monolithic(predictor_obj, input_path, feature_cols):
    tracemalloc.start()
    t0 = time.perf_counter()

    df_all = pd.read_parquet(input_path)
    predictor_obj.add_input(x=df_all[feature_cols])
    summary_df = predictor_obj.summarize()

    elapsed_s = time.perf_counter() - t0
    _, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()

    return {
        "rows": len(df_all),
        "runtime_s": round(elapsed_s, 3),
        "peak_mb": round(to_mb(peak), 2),
    }, summary_df.head(3)


mono_metrics, mono_preview = score_monolithic(
    predictor_obj=predictor,
    input_path=str(input_parquet_path),
    feature_cols=features,
)

print("Monolithic metrics:", mono_metrics)
print("Chunked global metrics:", global_metrics)
mono_preview
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
Monolithic metrics: {'rows': 133650, 'runtime_s': 8.232, 'peak_mb': 122.85}
Chunked global metrics: {'total_rows': 133650, 'chunks': 34, 'total_runtime_s': 11.916, 'rows_per_second': 11216.36, 'global_peak_mb': 5.17, 'output_path': 'tutorial_artifacts/batch_scoring/titanic_batch_scored.parquet'}
[10]:
ypred proba feature_1 value_1 contribution_1 feature_2 value_2 contribution_2 feature_3 value_3 contribution_3
0 Deceased 0.919866 Sex male 1.034905 Ticket class 3 0.603876 Age 22.0 0.317403
1 Survived 0.963956 Sex female 2.369341 Ticket class 1 1.925623 Age 38.0 -0.615889
2 Survived 0.723818 Sex female 1.826966 Ticket class 3 -1.055175 Age 26.0 0.399862

8. Takeaways

  • Use ParquetFile.iter_batches to stream input data by chunk.

  • Keep chunk_size tunable to trade off throughput vs memory usage.

  • Write each scored chunk directly to parquet with ParquetWriter.

  • For production, persist the SmartPredictor with predictor.save(...) and reload it in your batch job.