Feature engineering (pyspark.ml)
How library built-ins impact the diagram, and how opaque modules contain them. Two runs: managed (opaque on) vs raw (opaque off).
Example code
Source: examples/example_feature_engineering_spark.py
"""Toy feature-engineering pipeline on PySpark, using ``pyspark.ml`` transformers.
This example shows **how library built-ins impact the process diagram** and how
conformare contains them.
Two failure modes appear when you track a pipeline that leans on ``pyspark.ml``:
* **Lineage breaks.** Most ``pyspark.ml`` transformers are JVM-backed: their
``.transform()`` builds the result DataFrame directly from Java and never calls the
Python DataFrame methods conformare patches. The result is an *orphan* node,
disconnected from its input -- the lineage silently snaps at every transformer.
* **The graph explodes.** A "chatty" Python feature block (many ``withColumn`` calls)
records one node per step, burying the shape of the pipeline.
conformare handles the first automatically: **``pyspark.ml`` is opaque by default**
(see ``cf.opaque_module`` / ``cf.set_opaque_modules``), so every transformer is
recorded as a single connected node -- input frame -> output frame, output columns
captured and profiled -- with its internals suppressed. For your *own* chatty code
you opt in with ``@cf.opaque``.
The script runs the same pipeline twice:
* **raw** -- ``set_opaque_modules([])`` disables the default, so you see the
unmanaged impact: the chatty block explodes and the ML transformers orphan.
* **managed** -- the ``pyspark.ml`` default is on and the chatty block is wrapped in
``cf.opaque``, so every step is a single connected, profiled node.
Run: python examples/example_feature_engineering_spark.py
Then open feature_report_raw.html and feature_report_managed.html.
"""
import os
import tempfile
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import (
Bucketizer,
OneHotEncoder,
StandardScaler,
StringIndexer,
VectorAssembler,
)
import conformare as cf
REGIONS = ["London", "Manchester", "Leeds", "Bristol", "Glasgow"]
PLANS = ["basic", "standard", "premium"]
def _write_customers(path, n=400):
"""Write the synthetic source to CSV with pandas (untracked), so reading it back
with the hooked ``spark.read`` gives a properly named source node."""
rows = [
(
i,
REGIONS[i % len(REGIONS)],
PLANS[i % len(PLANS)],
18 + (i * 7) % 50, # age 18..67
float((i * 13) % 200), # watch_minutes 0..199
)
for i in range(n)
]
pd.DataFrame(rows, columns=["customer_id", "region", "plan", "age", "watch_minutes"]).to_csv(
path, index=False
)
def derive_ratio_features(df):
"""A chatty, pure-DataFrame block -- five withColumns. Tracked raw, this is five
separate nodes; wrapped in cf.opaque, it is one."""
df = df.withColumn("watch_per_age", F.col("watch_minutes") / F.col("age"))
df = df.withColumn("is_adult", (F.col("age") >= 18).cast("int"))
df = df.withColumn("heavy_watcher", (F.col("watch_minutes") > 100).cast("int"))
df = df.withColumn("plan_premium", (F.col("plan") == "premium").cast("int"))
df = df.withColumn("age_squared", F.col("age") * F.col("age"))
return df
def build_features(df, *, manage):
# Our own chatty block: opt in to opaque only in the managed run.
derive = cf.opaque(derive_ratio_features) if manage else derive_ratio_features
with cf.describe(
"Ratio features", purpose="Engineered ratios & flags", definition_owner="ml-platform"
):
df = derive(df)
# pyspark.ml transformers. Opaque by DEFAULT (managed run) -> one connected node
# each; in the raw run (opaque modules disabled) their JVM outputs orphan.
with cf.describe(
"Categorical encoding",
purpose="Index + one-hot encode (pyspark.ml)",
definition_owner="ml-platform",
):
df = (
StringIndexer(
inputCols=["region", "plan"],
outputCols=["region_idx", "plan_idx"],
handleInvalid="keep",
)
.fit(df)
.transform(df)
)
df = (
OneHotEncoder(
inputCols=["region_idx", "plan_idx"], outputCols=["region_ohe", "plan_ohe"]
)
.fit(df)
.transform(df)
)
df = df.drop("region_idx", "plan_idx")
with cf.describe(
"Scale numeric features",
purpose="Bucket, assemble & standard-scale (pyspark.ml)",
definition_owner="ml-platform",
):
df = Bucketizer(
splits=[-float("inf"), 25, 40, 55, float("inf")], inputCol="age", outputCol="age_bucket"
).transform(df)
df = VectorAssembler(
inputCols=["age", "watch_minutes", "age_bucket"],
outputCol="num_vec",
handleInvalid="keep",
).transform(df)
df = (
StandardScaler(
inputCol="num_vec", outputCol="scaled_features", withMean=True, withStd=True
)
.fit(df)
.transform(df)
)
df = df.drop("num_vec")
return df
def _run(spark, csv_path, manage, out, title):
cf.store.clear()
cf.reset_context()
# managed -> pyspark.ml opaque by default; raw -> disable so internals show through
cf.set_opaque_modules(["pyspark.ml"] if manage else [])
cf.trackSpark()
numeric = ["age", "watch_minutes", "watch_per_age", "age_squared", "age_bucket"]
cf.set_profiles(
{
"*": [
cf.rowCount,
cf.dataSize,
cf.nullFraction(columns="all"),
cf.histogram(columns=numeric),
cf.iqrOutliers(columns=numeric),
]
}
)
cf.profile_sources(True)
cf.describe_process(
"Toy feature-engineering pipeline (pyspark.ml). Demonstrates how library "
"built-ins impact lineage and how opaque modules contain them."
)
customers = (
spark.read.option("header", True).option("inferSchema", True).csv(csv_path)
) # hooked -> source named "customers"
features = build_features(customers, manage=manage)
features.write.mode("overwrite").parquet(
os.path.join(tempfile.mkdtemp(prefix="ft_features_"), "features")
)
html = cf.to_html(path=out, title=title)
m = cf.build_model(cf.store)
cf.restore()
print(
f"wrote {out} ({len(html):,} bytes) -- nodes={m['stats']['nodes']}, "
f"operations={m['stats']['operations']}, columns={m['stats']['columns']}"
)
return m["stats"]["nodes"]
def main():
spark = (
SparkSession.builder.master("local[1]")
.appName("conformare-feature-eng")
.config("spark.ui.enabled", "false")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
csv_path = os.path.join(tempfile.mkdtemp(prefix="ft_features_src_"), "customers.csv")
_write_customers(csv_path)
outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output")
os.makedirs(outdir, exist_ok=True)
try:
raw = _run(
spark,
csv_path,
manage=False,
out=os.path.join(outdir, "feature_report_raw.html"),
title="Feature engineering — unmanaged (opaque modules off)",
)
managed = _run(
spark,
csv_path,
manage=True,
out=os.path.join(outdir, "feature_report_managed.html"),
title="Feature engineering — pyspark.ml opaque by default",
)
print(
f"\nDiagram impact: raw={raw} nodes vs managed={managed} nodes. "
"Raw fragments the graph (chatty block explodes; JVM transformers orphan "
"their outputs); managed keeps each pyspark.ml transformer (and the wrapped "
"chatty block) as one connected, profiled node."
)
finally:
spark.stop()
if __name__ == "__main__":
main()