Cross-engine: Spark -> pandas -> Spark

A pipeline that filters in Spark, engineers features in pandas, then writes back through Spark. The toPandas and createDataFrame boundaries are tracked, so the whole thing stays one connected lineage instead of three disconnected chains.

Open the full report ↗

Example code

Source: examples/example_spark_pandas_roundtrip.py

"""Cross-engine pipeline: start in PySpark, move to pandas, and back -- one lineage.

Pipelines often hop engines: do the heavy filtering/joining in Spark, drop to pandas for
something the pandas API expresses cleanly, then go back to Spark to write. Conformare
tracks the conversion boundaries, so the whole thing stays **one connected graph**
instead of three disconnected chains.

Tracked conversion boundaries:
* ``DataFrame.toPandas()``               Spark      -> pandas
* ``SparkSession.createDataFrame(pdf)``  pandas     -> Spark
* ``DataFrame.pandas_api()``             Spark      -> pandas-on-Spark
* ``ps.DataFrame.to_pandas()``           pandas-on-Spark -> pandas
* ``ps.DataFrame.to_spark()``            pandas-on-Spark -> Spark

This example uses the real-pandas path (``toPandas`` / ``createDataFrame``). Enable both
adapters so the pandas-stage operations are tracked too::

    cf.trackSpark()
    cf.trackPandas()

Run:  python examples/example_spark_pandas_roundtrip.py
Then open output/spark_pandas_roundtrip_report.html.
"""

import os

import pandas as pd
from pyspark.sql import SparkSession

import conformare as cf

REGIONS = ["England", "Scotland", "Wales", "Northern Ireland"]


def _make_pdf(n: int = 200) -> pd.DataFrame:
    rows = [
        (i, REGIONS[i % len(REGIONS)], 18 + (i * 7) % 50, float((i * 13) % 300), 1 + (i % 9))
        for i in range(n)
    ]
    return pd.DataFrame(rows, columns=["customer_id", "region", "age", "spend", "tenure"])


def main(out=None):
    out = out or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
        "output",
        "spark_pandas_roundtrip_report.html",
    )
    os.makedirs(os.path.dirname(out), exist_ok=True)

    spark = (
        SparkSession.builder.master("local[1]")
        .appName("conformare-roundtrip")
        .config("spark.ui.enabled", "false")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("ERROR")

    cf.trackSpark()  # Spark ops + the conversion boundaries
    cf.trackPandas()  # the pandas-stage ops
    cf.set_profiles({"*": [cf.rowCount, cf.dataSize]})
    cf.describe_process(
        "Cross-engine customer analytics: filter in Spark, engineer features in pandas, "
        "write back through Spark -- tracked end to end across both conversions."
    )

    with cf.describe("Filter adults (Spark)", purpose="Keep scorable customers"):
        customers = spark.createDataFrame(_make_pdf())  # Spark source
        adults = customers.filter(customers.age >= 18)

    with cf.describe("Engineer features (pandas)", purpose="Per-customer and per-region features"):
        pdf = adults.toPandas()  # Spark -> pandas
        pdf = pdf.assign(spend_per_year=pdf["spend"] / pdf["tenure"])
        region_avg = pdf.groupby("region", as_index=False).agg(region_avg_spend=("spend", "mean"))
        enriched = pdf.merge(region_avg, on="region", how="left")  # two pandas parents

    with cf.describe("Persist (Spark)", purpose="Return to Spark for the write"):
        sdf = spark.createDataFrame(enriched)  # pandas -> Spark
        final = sdf.filter(sdf.spend_per_year > 0)

    html = cf.to_html(out, title="Spark -> pandas -> Spark (one lineage)")

    # Confirm the whole pipeline is one connected graph.
    model = cf.build_model(cf.store)
    adj = {}
    for e in model["edges"]:
        adj.setdefault(e["source"], []).append(e["target"])
    sources = [n["id"] for n in model["nodes"] if n["op"] == "source"]
    seen, stack = set(), list(sources)
    while stack:
        node = stack.pop()
        if node not in seen:
            seen.add(node)
            stack += adj.get(node, [])

    rows = final.count()
    cf.restore()
    spark.stop()
    print(f"wrote {out} ({len(html):,} bytes)")
    print(f"  ops        : {[e.op for e in cf.lineage()]}")
    print(f"  connected  : {len(seen)}/{model['stats']['nodes']} nodes reachable from source")
    print(f"  final rows : {rows}")
    return out


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.