Shapash tutorial: batch scoring parquet (industrial)¶
This notebook shows an industrial batch workflow with Shapash: - train a Titanic binary classifier - build a SmartPredictor - score a large parquet dataset by chunks - write results incrementally to parquet - monitor runtime and memory behavior
1. Imports¶
[1]:
import os
import gc
import time
import tracemalloc
from pathlib import Path
import numpy as np
import pandas as pd
from category_encoders import one_hot
from lightgbm import LGBMClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
import pyarrow as pa
import pyarrow.parquet as pq
from shapash import SmartExplainer
from shapash.data.data_loader import data_loading
2. Load and prepare Titanic data¶
[2]:
titanic_df, titanic_dict = data_loading("titanic")
titanic_df["Pclass"] = titanic_df["Pclass"].map(
{"First class": 1, "Second class": 2, "Third class": 3}
)
features = ["Pclass", "Age", "Sex", "SibSp", "Parch"]
target = "Survived"
df = titanic_df[features + [target]].copy()
df["Age"] = df["Age"].fillna(df["Age"].median())
df["Pclass"] = df["Pclass"].fillna(df["Pclass"].median())
X = df[features]
y = df[target].astype(int).to_frame()
encoder = one_hot.OneHotEncoder(cols=["Sex"])
X_enc = encoder.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(
X_enc, y, test_size=0.2, random_state=79, stratify=y
)
X_train.shape, X_test.shape
[2]:
((712, 6), (179, 6))
3. Train model and build SmartPredictor¶
[ ]:
model = LGBMClassifier(max_depth=3, n_estimators=120, random_state=79, verbose=-1)
model.fit(X_train, y_train.iloc[:, 0])
y_pred_test = model.predict(X_test)
metrics_df = pd.Series(
{
"accuracy": round(accuracy_score(y_test.iloc[:, 0], y_pred_test), 4),
"precision": round(precision_score(y_test.iloc[:, 0], y_pred_test), 4),
"recall": round(recall_score(y_test.iloc[:, 0], y_pred_test), 4),
"f1": round(f1_score(y_test.iloc[:, 0], y_pred_test), 4),
}, name="test_metrics"
)
metrics_df
accuracy 0.8045
precision 0.7656
recall 0.7101
f1 0.7368
Name: test_metrics, dtype: float64
[4]:
response_dict = {0: "Deceased", 1: "Survived"}
xpl = SmartExplainer(
model=model,
preprocessing=encoder,
features_dict=titanic_dict,
label_dict=response_dict,
title_story="Titanic batch scoring with parquet"
)
# Compile on a representative sample to initialize explainability artifacts.
xpl.compile(
x=X_test,
y_pred=pd.DataFrame(y_pred_test, index=X_test.index, columns=[target]),
y_target=y_test
)
predictor = xpl.to_smartpredictor()
predictor.modify_mask(max_contrib=3)
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
4. Create a larger parquet input dataset¶
To simulate an industrial batch, we replicate Titanic rows and persist them as parquet.
[5]:
DATA_DIR = Path("./tutorial_artifacts/batch_scoring")
DATA_DIR.mkdir(parents=True, exist_ok=True)
input_parquet_path = DATA_DIR / "titanic_batch_input.parquet"
output_parquet_path = DATA_DIR / "titanic_batch_scored.parquet"
n_repeats = 150
batch_input_df = pd.concat([X] * n_repeats, axis=0, ignore_index=True)
batch_input_df["row_id"] = np.arange(len(batch_input_df))
batch_input_df.to_parquet(input_parquet_path, index=False)
print(f"Input parquet: {input_parquet_path}")
print(f"Rows: {len(batch_input_df):,}")
batch_input_df.head(3)
Input parquet: tutorial_artifacts/batch_scoring/titanic_batch_input.parquet
Rows: 133,650
[5]:
| Pclass | Age | Sex | SibSp | Parch | row_id | |
|---|---|---|---|---|---|---|
| 0 | 3 | 22.0 | male | 1 | 0 | 0 |
| 1 | 1 | 38.0 | female | 1 | 0 | 1 |
| 2 | 3 | 26.0 | female | 0 | 0 | 2 |
5. Batch scoring by parquet chunks¶
Key industrial choices: - stream parquet by batches (no full dataframe load) - score and explain each chunk - append scored results incrementally to output parquet - collect chunk-level runtime and memory stats
[6]:
def to_mb(n_bytes):
return n_bytes / (1024 ** 2)
def batch_score_parquet(
predictor_obj,
input_path,
output_path,
feature_cols,
id_col="row_id",
chunk_size=5000,
):
if os.path.exists(output_path):
os.remove(output_path)
parquet_file = pq.ParquetFile(input_path)
writer = None
chunk_logs = []
total_rows = 0
tracemalloc.start()
t0_global = time.perf_counter()
try:
for chunk_id, record_batch in enumerate(parquet_file.iter_batches(batch_size=chunk_size), start=1):
t0_chunk = time.perf_counter()
current_bytes, peak_bytes_before = tracemalloc.get_traced_memory()
chunk_df = record_batch.to_pandas()
x_chunk = chunk_df[feature_cols]
predictor_obj.add_input(x=x_chunk)
summary_df = predictor_obj.summarize()
scored_chunk = pd.concat(
[chunk_df[[id_col]].reset_index(drop=True), summary_df.reset_index(drop=True)],
axis=1,
)
scored_chunk["chunk_id"] = chunk_id
# Parquet cannot serialize mixed python object columns reliably (e.g. int/str in value_i).
object_cols = scored_chunk.select_dtypes(include=["object"]).columns
if len(object_cols) > 0:
scored_chunk[object_cols] = scored_chunk[object_cols].astype("string")
table = pa.Table.from_pandas(scored_chunk, preserve_index=False)
if writer is None:
writer = pq.ParquetWriter(output_path, table.schema)
writer.write_table(table)
elapsed_ms = (time.perf_counter() - t0_chunk) * 1000
current_after, peak_after = tracemalloc.get_traced_memory()
chunk_logs.append(
{
"chunk_id": chunk_id,
"rows": len(chunk_df),
"latency_ms": round(elapsed_ms, 2),
"mem_current_mb": round(to_mb(current_after), 2),
"mem_peak_mb": round(to_mb(max(peak_bytes_before, peak_after)), 2),
}
)
total_rows += len(chunk_df)
del chunk_df, x_chunk, summary_df, scored_chunk, table
gc.collect()
finally:
if writer is not None:
writer.close()
total_s = time.perf_counter() - t0_global
_, final_peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
perf_df = pd.DataFrame(chunk_logs)
global_metrics = {
"total_rows": total_rows,
"chunks": len(perf_df),
"total_runtime_s": round(total_s, 3),
"rows_per_second": round(total_rows / max(total_s, 1e-9), 2),
"global_peak_mb": round(to_mb(final_peak), 2),
"output_path": str(output_path),
}
return perf_df, global_metrics
[7]:
perf_df, global_metrics = batch_score_parquet(
predictor_obj=predictor,
input_path=str(input_parquet_path),
output_path=str(output_parquet_path),
feature_cols=features,
id_col="row_id",
chunk_size=4000,
)
print(global_metrics)
perf_df.head(10)
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
{'total_rows': 133650, 'chunks': 34, 'total_runtime_s': 11.916, 'rows_per_second': 11216.36, 'global_peak_mb': 5.17, 'output_path': 'tutorial_artifacts/batch_scoring/titanic_batch_scored.parquet'}
[7]:
| chunk_id | rows | latency_ms | mem_current_mb | mem_peak_mb | |
|---|---|---|---|---|---|
| 0 | 1 | 4000 | 388.51 | 4.20 | 4.82 |
| 1 | 2 | 4000 | 292.95 | 4.32 | 5.11 |
| 2 | 3 | 4000 | 297.66 | 4.32 | 5.12 |
| 3 | 4 | 4000 | 296.40 | 4.32 | 5.12 |
| 4 | 5 | 4000 | 315.24 | 4.33 | 5.12 |
| 5 | 6 | 4000 | 306.67 | 4.33 | 5.13 |
| 6 | 7 | 4000 | 311.46 | 4.33 | 5.13 |
| 7 | 8 | 4000 | 306.44 | 4.34 | 5.13 |
| 8 | 9 | 4000 | 300.08 | 4.34 | 5.13 |
| 9 | 10 | 4000 | 295.54 | 4.34 | 5.13 |
6. Read scored parquet and inspect outputs¶
[8]:
scored_df = pd.read_parquet(output_parquet_path)
print(f"Scored rows: {len(scored_df):,}")
print(f"Columns: {scored_df.columns.tolist()}")
scored_df.head(5)
Scored rows: 133,650
Columns: ['row_id', 'ypred', 'proba', 'feature_1', 'value_1', 'contribution_1', 'feature_2', 'value_2', 'contribution_2', 'feature_3', 'value_3', 'contribution_3', 'chunk_id']
[8]:
| row_id | ypred | proba | feature_1 | value_1 | contribution_1 | feature_2 | value_2 | contribution_2 | feature_3 | value_3 | contribution_3 | chunk_id | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 0 | Deceased | 0.919866 | Sex | male | 1.0349050232018024 | Ticket class | 3 | 0.6038758628502086 | Age | 22.0 | 0.3174027447109047 | 1 |
| 1 | 1 | Survived | 0.963956 | Sex | female | 2.369341374897147 | Ticket class | 1 | 1.9256231382751856 | Age | 38.0 | -0.6158891469343937 | 1 |
| 2 | 2 | Survived | 0.723818 | Sex | female | 1.826965866857368 | Ticket class | 3 | -1.055174618975645 | Age | 26.0 | 0.3998616351305038 | 1 |
| 3 | 3 | Survived | 0.990708 | Sex | female | 2.4174548644071776 | Ticket class | 1 | 2.0805592032295315 | Age | 35.0 | 0.5622844591058956 | 1 |
| 4 | 4 | Deceased | 0.860486 | Sex | male | 1.023258463786566 | Ticket class | 3 | 0.6412293719558009 | Age | 35.0 | -0.1650029126172407 | 1 |
[9]:
perf_summary = pd.Series(
{
"rows_total": int(perf_df["rows"].sum()),
"latency_ms_mean": float(perf_df["latency_ms"].mean()),
"latency_ms_p95": float(perf_df["latency_ms"].quantile(0.95)),
"peak_mb_max": float(perf_df["mem_peak_mb"].max()),
}
)
perf_summary
[9]:
rows_total 133650.000000
latency_ms_mean 306.115588
latency_ms_p95 356.844000
peak_mb_max 5.170000
dtype: float64
7. Optional: compare with monolithic scoring¶
For medium datasets only, this baseline helps quantify why chunking is safer on memory.
[10]:
def score_monolithic(predictor_obj, input_path, feature_cols):
tracemalloc.start()
t0 = time.perf_counter()
df_all = pd.read_parquet(input_path)
predictor_obj.add_input(x=df_all[feature_cols])
summary_df = predictor_obj.summarize()
elapsed_s = time.perf_counter() - t0
_, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
return {
"rows": len(df_all),
"runtime_s": round(elapsed_s, 3),
"peak_mb": round(to_mb(peak), 2),
}, summary_df.head(3)
mono_metrics, mono_preview = score_monolithic(
predictor_obj=predictor,
input_path=str(input_parquet_path),
feature_cols=features,
)
print("Monolithic metrics:", mono_metrics)
print("Chunked global metrics:", global_metrics)
mono_preview
INFO: Shap explainer type - <shap.explainers._tree.TreeExplainer object at 0x11f9ff0e0>
Monolithic metrics: {'rows': 133650, 'runtime_s': 8.232, 'peak_mb': 122.85}
Chunked global metrics: {'total_rows': 133650, 'chunks': 34, 'total_runtime_s': 11.916, 'rows_per_second': 11216.36, 'global_peak_mb': 5.17, 'output_path': 'tutorial_artifacts/batch_scoring/titanic_batch_scored.parquet'}
[10]:
| ypred | proba | feature_1 | value_1 | contribution_1 | feature_2 | value_2 | contribution_2 | feature_3 | value_3 | contribution_3 | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | Deceased | 0.919866 | Sex | male | 1.034905 | Ticket class | 3 | 0.603876 | Age | 22.0 | 0.317403 |
| 1 | Survived | 0.963956 | Sex | female | 2.369341 | Ticket class | 1 | 1.925623 | Age | 38.0 | -0.615889 |
| 2 | Survived | 0.723818 | Sex | female | 1.826966 | Ticket class | 3 | -1.055175 | Age | 26.0 | 0.399862 |
8. Takeaways¶
Use
ParquetFile.iter_batchesto stream input data by chunk.Keep
chunk_sizetunable to trade off throughput vs memory usage.Write each scored chunk directly to parquet with
ParquetWriter.For production, persist the SmartPredictor with
predictor.save(...)and reload it in your batch job.