Feature engineering (pyspark.ml)

How library built-ins impact the diagram, and how opaque modules contain them. Two runs: managed (opaque on) vs raw (opaque off).

Open the full report ↗

Example code

Source: examples/example_feature_engineering_spark.py

"""Toy feature-engineering pipeline on PySpark, using ``pyspark.ml`` transformers.

This example shows **how library built-ins impact the process diagram** and how
conformare contains them.

Two failure modes appear when you track a pipeline that leans on ``pyspark.ml``:

* **Lineage breaks.** Most ``pyspark.ml`` transformers are JVM-backed: their
  ``.transform()`` builds the result DataFrame directly from Java and never calls the
  Python DataFrame methods conformare patches. The result is an *orphan* node,
  disconnected from its input -- the lineage silently snaps at every transformer.
* **The graph explodes.** A "chatty" Python feature block (many ``withColumn`` calls)
  records one node per step, burying the shape of the pipeline.

conformare handles the first automatically: **``pyspark.ml`` is opaque by default**
(see ``cf.opaque_module`` / ``cf.set_opaque_modules``), so every transformer is
recorded as a single connected node -- input frame -> output frame, output columns
captured and profiled -- with its internals suppressed. For your *own* chatty code
you opt in with ``@cf.opaque``.

The script runs the same pipeline twice:

* **raw** -- ``set_opaque_modules([])`` disables the default, so you see the
  unmanaged impact: the chatty block explodes and the ML transformers orphan.
* **managed** -- the ``pyspark.ml`` default is on and the chatty block is wrapped in
  ``cf.opaque``, so every step is a single connected, profiled node.

Run:  python examples/example_feature_engineering_spark.py
Then open feature_report_raw.html and feature_report_managed.html.
"""

import os
import tempfile

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import (
    Bucketizer,
    OneHotEncoder,
    StandardScaler,
    StringIndexer,
    VectorAssembler,
)

import conformare as cf

REGIONS = ["London", "Manchester", "Leeds", "Bristol", "Glasgow"]
PLANS = ["basic", "standard", "premium"]


def _write_customers(path, n=400):
    """Write the synthetic source to CSV with pandas (untracked), so reading it back
    with the hooked ``spark.read`` gives a properly named source node."""
    rows = [
        (
            i,
            REGIONS[i % len(REGIONS)],
            PLANS[i % len(PLANS)],
            18 + (i * 7) % 50,  # age 18..67
            float((i * 13) % 200),  # watch_minutes 0..199
        )
        for i in range(n)
    ]
    pd.DataFrame(rows, columns=["customer_id", "region", "plan", "age", "watch_minutes"]).to_csv(
        path, index=False
    )


def derive_ratio_features(df):
    """A chatty, pure-DataFrame block -- five withColumns. Tracked raw, this is five
    separate nodes; wrapped in cf.opaque, it is one."""
    df = df.withColumn("watch_per_age", F.col("watch_minutes") / F.col("age"))
    df = df.withColumn("is_adult", (F.col("age") >= 18).cast("int"))
    df = df.withColumn("heavy_watcher", (F.col("watch_minutes") > 100).cast("int"))
    df = df.withColumn("plan_premium", (F.col("plan") == "premium").cast("int"))
    df = df.withColumn("age_squared", F.col("age") * F.col("age"))
    return df


def build_features(df, *, manage):
    # Our own chatty block: opt in to opaque only in the managed run.
    derive = cf.opaque(derive_ratio_features) if manage else derive_ratio_features
    with cf.describe(
        "Ratio features", purpose="Engineered ratios & flags", definition_owner="ml-platform"
    ):
        df = derive(df)

    # pyspark.ml transformers. Opaque by DEFAULT (managed run) -> one connected node
    # each; in the raw run (opaque modules disabled) their JVM outputs orphan.
    with cf.describe(
        "Categorical encoding",
        purpose="Index + one-hot encode (pyspark.ml)",
        definition_owner="ml-platform",
    ):
        df = (
            StringIndexer(
                inputCols=["region", "plan"],
                outputCols=["region_idx", "plan_idx"],
                handleInvalid="keep",
            )
            .fit(df)
            .transform(df)
        )
        df = (
            OneHotEncoder(
                inputCols=["region_idx", "plan_idx"], outputCols=["region_ohe", "plan_ohe"]
            )
            .fit(df)
            .transform(df)
        )
        df = df.drop("region_idx", "plan_idx")

    with cf.describe(
        "Scale numeric features",
        purpose="Bucket, assemble & standard-scale (pyspark.ml)",
        definition_owner="ml-platform",
    ):
        df = Bucketizer(
            splits=[-float("inf"), 25, 40, 55, float("inf")], inputCol="age", outputCol="age_bucket"
        ).transform(df)
        df = VectorAssembler(
            inputCols=["age", "watch_minutes", "age_bucket"],
            outputCol="num_vec",
            handleInvalid="keep",
        ).transform(df)
        df = (
            StandardScaler(
                inputCol="num_vec", outputCol="scaled_features", withMean=True, withStd=True
            )
            .fit(df)
            .transform(df)
        )
        df = df.drop("num_vec")
    return df


def _run(spark, csv_path, manage, out, title):
    cf.store.clear()
    cf.reset_context()
    # managed -> pyspark.ml opaque by default; raw -> disable so internals show through
    cf.set_opaque_modules(["pyspark.ml"] if manage else [])
    cf.trackSpark()
    numeric = ["age", "watch_minutes", "watch_per_age", "age_squared", "age_bucket"]
    cf.set_profiles(
        {
            "*": [
                cf.rowCount,
                cf.dataSize,
                cf.nullFraction(columns="all"),
                cf.histogram(columns=numeric),
                cf.iqrOutliers(columns=numeric),
            ]
        }
    )
    cf.profile_sources(True)
    cf.describe_process(
        "Toy feature-engineering pipeline (pyspark.ml). Demonstrates how library "
        "built-ins impact lineage and how opaque modules contain them."
    )

    customers = (
        spark.read.option("header", True).option("inferSchema", True).csv(csv_path)
    )  # hooked -> source named "customers"
    features = build_features(customers, manage=manage)
    features.write.mode("overwrite").parquet(
        os.path.join(tempfile.mkdtemp(prefix="ft_features_"), "features")
    )

    html = cf.to_html(path=out, title=title)
    m = cf.build_model(cf.store)
    cf.restore()
    print(
        f"wrote {out} ({len(html):,} bytes) -- nodes={m['stats']['nodes']}, "
        f"operations={m['stats']['operations']}, columns={m['stats']['columns']}"
    )
    return m["stats"]["nodes"]


def main():
    spark = (
        SparkSession.builder.master("local[1]")
        .appName("conformare-feature-eng")
        .config("spark.ui.enabled", "false")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("ERROR")
    csv_path = os.path.join(tempfile.mkdtemp(prefix="ft_features_src_"), "customers.csv")
    _write_customers(csv_path)
    outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output")
    os.makedirs(outdir, exist_ok=True)
    try:
        raw = _run(
            spark,
            csv_path,
            manage=False,
            out=os.path.join(outdir, "feature_report_raw.html"),
            title="Feature engineering — unmanaged (opaque modules off)",
        )
        managed = _run(
            spark,
            csv_path,
            manage=True,
            out=os.path.join(outdir, "feature_report_managed.html"),
            title="Feature engineering — pyspark.ml opaque by default",
        )
        print(
            f"\nDiagram impact: raw={raw} nodes vs managed={managed} nodes. "
            "Raw fragments the graph (chatty block explodes; JVM transformers orphan "
            "their outputs); managed keeps each pyspark.ml transformer (and the wrapped "
            "chatty block) as one connected, profiled node."
        )
    finally:
        spark.stop()


if __name__ == "__main__":
    main()

Output report

Managed

Open in a new tab ↗

Raw

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.