ML scoring by region (PySpark + spark.ml)

The same scenario on Spark: spark.ml transformers and the classifier are auto-opaque, so the prep -> model -> analysis shape stays intact.

Open the full report ↗

Example code

Source: examples/example_ml_region_spark.py

"""PySpark + spark.ml: data prep -> classifier -> analyse predictions by region.

The same scenario as ``example_ml_region_pandas.py``, on PySpark. A feature-
engineering pipeline (index + one-hot encode region, assemble, scale) feeds a
logistic-regression classifier; we extract the predicted churn probability and
analyse it against the customer's UK nation (England, Scotland, Wales, Northern
Ireland).

Here **auto-opaque does the work**: ``pyspark.ml`` is opaque by default, so every
feature transformer and the classifier's ``.transform()`` is recorded as a single
connected node (input frame in, output frame out) with its internals suppressed.
You see the shape of the pipeline -- prep -> features -> model -> analysis -- without
the ML internals exploding or orphaning the lineage. Contrast with the pandas
example, where the scikit-learn model is contained with ``@cf.opaque`` instead.

Run:  python examples/example_ml_region_spark.py
Then open output/ml_region_spark_report.html.
"""

import os

import numpy as np
import pandas as pd
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StandardScaler, StringIndexer, VectorAssembler
from pyspark.ml.functions import vector_to_array
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import conformare as cf

REGIONS = ["England", "Scotland", "Wales", "Northern Ireland"]
NUMERIC = ["age", "tenure_months", "monthly_spend", "num_products", "spend_per_product"]


def _make_pdf(seed: int, n: int = 600) -> pd.DataFrame:
    """Synthetic customer table with a churn signal that varies slightly by region."""
    rng = np.random.default_rng(seed)
    region = rng.choice(REGIONS, size=n, p=[0.55, 0.18, 0.15, 0.12])
    age = rng.integers(18, 80, n)
    tenure = rng.integers(1, 72, n)
    spend = rng.normal(50, 20, n).clip(5, 200).round(2)
    products = rng.integers(1, 5, n)
    region_effect = {"England": 0.0, "Scotland": 0.1, "Wales": 0.15, "Northern Ireland": -0.05}
    logit = (
        -1.5
        + (spend - 50) / 40.0
        - (tenure - 36) / 40.0
        + np.array([region_effect[r] for r in region])
    )
    churned = (rng.random(n) < 1.0 / (1.0 + np.exp(-logit))).astype(int)
    return pd.DataFrame(
        {
            "region": region,
            "age": age.astype("int64"),
            "tenure_months": tenure.astype("int64"),
            "monthly_spend": spend,
            "num_products": products.astype("int64"),
            "churned": churned.astype("int64"),
        }
    )


def main(out=None):
    out = out or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
        "output",
        "ml_region_spark_report.html",
    )
    os.makedirs(os.path.dirname(out), exist_ok=True)

    spark = (
        SparkSession.builder.master("local[1]")
        .appName("conformare-ml-region")
        .config("spark.ui.enabled", "false")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("ERROR")

    cf.trackSpark()  # pyspark.ml is opaque by default
    cf.set_profiles({"*": [cf.rowCount, cf.dataSize]})
    cf.describe_process(
        "Customer churn scoring on PySpark: engineer features with spark.ml, apply a "
        "logistic-regression classifier, then analyse predicted churn probability by UK nation."
    )

    sdf = spark.createDataFrame(_make_pdf(seed=2))

    with cf.describe("Prepare features", purpose="Keep scorable rows and derive features"):
        adults = sdf.filter(sdf.age >= 18)  # tracked filter
        prepared = adults.withColumn(
            "spend_per_product", F.col("monthly_spend") / F.col("num_products")
        )

    # Feature engineering with spark.ml -- each transformer is auto-opaque (one node).
    with cf.describe(
        "Encode and assemble features", purpose="Index/one-hot region, assemble, scale"
    ):
        prepared = (
            StringIndexer(inputCol="region", outputCol="region_idx", handleInvalid="keep")
            .fit(prepared)
            .transform(prepared)
        )
        prepared = (
            OneHotEncoder(inputCols=["region_idx"], outputCols=["region_ohe"])
            .fit(prepared)
            .transform(prepared)
        )
        prepared = VectorAssembler(
            inputCols=NUMERIC + ["region_ohe"], outputCol="features_raw", handleInvalid="keep"
        ).transform(prepared)
        prepared = (
            StandardScaler(
                inputCol="features_raw", outputCol="features", withMean=True, withStd=True
            )
            .fit(prepared)
            .transform(prepared)
        )

    with cf.describe(
        "Score churn risk",
        purpose="Predicted probability of churn per customer",
        risks=cf.risk(
            "fairness.proxy_variable",
            note="region is a model feature and may proxy for protected attributes",
            mitigation="monitor per-region calibration and review feature importance",
            owner="ml-governance",
        ),
    ):
        model = LogisticRegression(
            featuresCol="features", labelCol="churned", probabilityCol="probability"
        ).fit(prepared)
        scored = model.transform(prepared)  # spark.ml -> auto-opaque node
        # Extract P(churn) from the probability vector (a tracked withColumn).
        scored = scored.withColumn("churn_proba", vector_to_array(F.col("probability"))[1])

    with cf.describe(
        "Churn risk by region", purpose="Average predicted churn probability per UK nation"
    ):
        by_region = scored.groupBy("region").agg(
            F.count(F.lit(1)).alias("customers"),
            F.round(F.mean("churn_proba"), 3).alias("mean_churn_proba"),
        )

    html = cf.to_html(out, title="Churn scoring by region (PySpark + spark.ml)")
    rows = by_region.orderBy("region").collect()
    cf.restore()
    spark.stop()
    print(f"wrote {out} ({len(html):,} bytes)")
    for r in rows:
        print(
            f"  {r['region']:<18} customers={r['customers']:<4} mean_churn_proba={r['mean_churn_proba']}"
        )
    return out


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.