ML scoring by region (pandas + scikit-learn)
Data prep feeds a scikit-learn classifier (contained with @opaque); predicted churn probability is then analysed by UK nation.
Example code
Source: examples/example_ml_region_pandas.py
"""Pandas + scikit-learn: data prep -> classifier -> analyse predictions by region.
Shows how a feature-engineering pipeline feeds a machine-learning model and how the
model's *output* is then used. A churn classifier produces a probability per
customer, and we analyse the predicted probability against the customer's UK nation
(England, Scotland, Wales, Northern Ireland).
The model itself is wrapped with ``@cf.opaque`` so it appears as a single connected
node: input features in, scored frame out, with the model's internals (encoding,
scaling, logistic regression) kept out of the lineage. This is the pandas equivalent
of the automatic ``pyspark.ml`` opaqueness in ``example_ml_region_spark.py``.
Why ``@cf.opaque`` rather than the opaque-module list? The module-based auto-opaque
hook is wired to PySpark's single ``Transformer.transform`` chokepoint, which returns
a DataFrame. scikit-learn has no equivalent single chokepoint and ``predict_proba``
returns a NumPy array, so the idiomatic way to contain it is to wrap the scoring step.
Run: python examples/example_ml_region_pandas.py
Then open output/ml_region_pandas_report.html.
"""
import os
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
import conformare as cf
REGIONS = ["England", "Scotland", "Wales", "Northern Ireland"]
NUMERIC = ["age", "tenure_months", "monthly_spend", "num_products"]
FEATURES = NUMERIC + ["spend_per_product", "region"]
def _make_data(seed: int, n: int = 600) -> pd.DataFrame:
"""Synthetic customer table with a churn signal that varies slightly by region."""
rng = np.random.default_rng(seed)
region = rng.choice(REGIONS, size=n, p=[0.55, 0.18, 0.15, 0.12])
age = rng.integers(18, 80, n)
tenure = rng.integers(1, 72, n)
spend = rng.normal(50, 20, n).clip(5, 200).round(2)
products = rng.integers(1, 5, n)
region_effect = {"England": 0.0, "Scotland": 0.1, "Wales": 0.15, "Northern Ireland": -0.05}
logit = (
-1.5
+ (spend - 50) / 40.0
- (tenure - 36) / 40.0
+ np.array([region_effect[r] for r in region])
)
churned = (rng.random(n) < 1.0 / (1.0 + np.exp(-logit))).astype(int)
return pd.DataFrame(
{
"region": region,
"age": age,
"tenure_months": tenure,
"monthly_spend": spend,
"num_products": products,
"churned": churned,
}
)
def _build_model() -> Pipeline:
"""A scikit-learn pipeline: one-hot region + scale numerics, then logistic regression."""
pre = ColumnTransformer(
[
("region", OneHotEncoder(handle_unknown="ignore"), ["region"]),
("num", StandardScaler(), NUMERIC + ["spend_per_product"]),
]
)
return Pipeline([("prep", pre), ("clf", LogisticRegression(max_iter=1000))])
def main(out=None):
out = out or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"output",
"ml_region_pandas_report.html",
)
os.makedirs(os.path.dirname(out), exist_ok=True)
# 1. Train the model on historical data BEFORE tracking, so training-time
# DataFrame access does not appear in the scoring pipeline's lineage.
model = _build_model()
train = _make_data(seed=1).assign(
spend_per_product=lambda d: d["monthly_spend"] / d["num_products"]
)
model.fit(train[FEATURES], train["churned"])
# 2. Track the scoring pipeline we want to document.
cf.trackPandas()
cf.set_profiles(
{
"*": [
cf.rowCount,
cf.dataSize,
cf.histogram(columns=NUMERIC),
cf.nullFraction(columns="all"),
]
}
)
cf.describe_process(
"Customer churn scoring: prepare features, apply a trained classifier, then "
"analyse predicted churn probability by UK nation."
)
batch = _make_data(seed=2)
with cf.describe(
"Prepare features",
purpose="Keep scorable rows and derive features for the model",
):
adults = batch[batch["age"] >= 18] # boolean index -> filter
prepared = adults.assign(spend_per_product=adults["monthly_spend"] / adults["num_products"])
@cf.opaque
def score_churn(df):
"""Apply the trained scikit-learn pipeline and attach P(churn) as a column.
Opaque: the model (one-hot encoding, scaling, logistic regression) is a black
box to the lineage -- only that it was applied, and the columns it produced,
are recorded."""
proba = model.predict_proba(df[FEATURES])[:, 1]
return df.assign(churn_proba=proba)
with cf.describe(
"Score churn risk",
purpose="Predicted probability of churn per customer",
risks=cf.risk(
"fairness.proxy_variable",
note="region is a model feature and may proxy for protected attributes",
mitigation="monitor per-region calibration and review feature importance",
owner="ml-governance",
),
):
scored = score_churn(prepared)
with cf.describe(
"Churn risk by region",
purpose="Average predicted churn probability per UK nation",
):
by_region = scored.groupby("region").agg(
customers=("churn_proba", "size"),
mean_churn_proba=("churn_proba", "mean"),
)
html = cf.to_html(out, title="Churn scoring by region (pandas + scikit-learn)")
cf.restore()
print(f"wrote {out} ({len(html):,} bytes)")
print(by_region.round(3).to_string())
return by_region
if __name__ == "__main__":
main()