ML scoring by region (PySpark + spark.ml)
The same scenario on Spark: spark.ml transformers and the classifier are auto-opaque, so the prep -> model -> analysis shape stays intact.
Example code
Source: examples/example_ml_region_spark.py
"""PySpark + spark.ml: data prep -> classifier -> analyse predictions by region.
The same scenario as ``example_ml_region_pandas.py``, on PySpark. A feature-
engineering pipeline (index + one-hot encode region, assemble, scale) feeds a
logistic-regression classifier; we extract the predicted churn probability and
analyse it against the customer's UK nation (England, Scotland, Wales, Northern
Ireland).
Here **auto-opaque does the work**: ``pyspark.ml`` is opaque by default, so every
feature transformer and the classifier's ``.transform()`` is recorded as a single
connected node (input frame in, output frame out) with its internals suppressed.
You see the shape of the pipeline -- prep -> features -> model -> analysis -- without
the ML internals exploding or orphaning the lineage. Contrast with the pandas
example, where the scikit-learn model is contained with ``@cf.opaque`` instead.
Run: python examples/example_ml_region_spark.py
Then open output/ml_region_spark_report.html.
"""
import os
import numpy as np
import pandas as pd
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StandardScaler, StringIndexer, VectorAssembler
from pyspark.ml.functions import vector_to_array
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import conformare as cf
REGIONS = ["England", "Scotland", "Wales", "Northern Ireland"]
NUMERIC = ["age", "tenure_months", "monthly_spend", "num_products", "spend_per_product"]
def _make_pdf(seed: int, n: int = 600) -> pd.DataFrame:
"""Synthetic customer table with a churn signal that varies slightly by region."""
rng = np.random.default_rng(seed)
region = rng.choice(REGIONS, size=n, p=[0.55, 0.18, 0.15, 0.12])
age = rng.integers(18, 80, n)
tenure = rng.integers(1, 72, n)
spend = rng.normal(50, 20, n).clip(5, 200).round(2)
products = rng.integers(1, 5, n)
region_effect = {"England": 0.0, "Scotland": 0.1, "Wales": 0.15, "Northern Ireland": -0.05}
logit = (
-1.5
+ (spend - 50) / 40.0
- (tenure - 36) / 40.0
+ np.array([region_effect[r] for r in region])
)
churned = (rng.random(n) < 1.0 / (1.0 + np.exp(-logit))).astype(int)
return pd.DataFrame(
{
"region": region,
"age": age.astype("int64"),
"tenure_months": tenure.astype("int64"),
"monthly_spend": spend,
"num_products": products.astype("int64"),
"churned": churned.astype("int64"),
}
)
def main(out=None):
out = out or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"output",
"ml_region_spark_report.html",
)
os.makedirs(os.path.dirname(out), exist_ok=True)
spark = (
SparkSession.builder.master("local[1]")
.appName("conformare-ml-region")
.config("spark.ui.enabled", "false")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
cf.trackSpark() # pyspark.ml is opaque by default
cf.set_profiles({"*": [cf.rowCount, cf.dataSize]})
cf.describe_process(
"Customer churn scoring on PySpark: engineer features with spark.ml, apply a "
"logistic-regression classifier, then analyse predicted churn probability by UK nation."
)
sdf = spark.createDataFrame(_make_pdf(seed=2))
with cf.describe("Prepare features", purpose="Keep scorable rows and derive features"):
adults = sdf.filter(sdf.age >= 18) # tracked filter
prepared = adults.withColumn(
"spend_per_product", F.col("monthly_spend") / F.col("num_products")
)
# Feature engineering with spark.ml -- each transformer is auto-opaque (one node).
with cf.describe(
"Encode and assemble features", purpose="Index/one-hot region, assemble, scale"
):
prepared = (
StringIndexer(inputCol="region", outputCol="region_idx", handleInvalid="keep")
.fit(prepared)
.transform(prepared)
)
prepared = (
OneHotEncoder(inputCols=["region_idx"], outputCols=["region_ohe"])
.fit(prepared)
.transform(prepared)
)
prepared = VectorAssembler(
inputCols=NUMERIC + ["region_ohe"], outputCol="features_raw", handleInvalid="keep"
).transform(prepared)
prepared = (
StandardScaler(
inputCol="features_raw", outputCol="features", withMean=True, withStd=True
)
.fit(prepared)
.transform(prepared)
)
with cf.describe(
"Score churn risk",
purpose="Predicted probability of churn per customer",
risks=cf.risk(
"fairness.proxy_variable",
note="region is a model feature and may proxy for protected attributes",
mitigation="monitor per-region calibration and review feature importance",
owner="ml-governance",
),
):
model = LogisticRegression(
featuresCol="features", labelCol="churned", probabilityCol="probability"
).fit(prepared)
scored = model.transform(prepared) # spark.ml -> auto-opaque node
# Extract P(churn) from the probability vector (a tracked withColumn).
scored = scored.withColumn("churn_proba", vector_to_array(F.col("probability"))[1])
with cf.describe(
"Churn risk by region", purpose="Average predicted churn probability per UK nation"
):
by_region = scored.groupBy("region").agg(
F.count(F.lit(1)).alias("customers"),
F.round(F.mean("churn_proba"), 3).alias("mean_churn_proba"),
)
html = cf.to_html(out, title="Churn scoring by region (PySpark + spark.ml)")
rows = by_region.orderBy("region").collect()
cf.restore()
spark.stop()
print(f"wrote {out} ({len(html):,} bytes)")
for r in rows:
print(
f" {r['region']:<18} customers={r['customers']:<4} mean_churn_proba={r['mean_churn_proba']}"
)
return out
if __name__ == "__main__":
main()