Great Expectations on PySpark

Validation bridged from Spark to pandas via Narwhals sampling.

Open the full report ↗

Example code

Source: examples/example_great_expectations_spark.py

"""Great Expectations checkpoints on a **PySpark** pipeline.

Same idea as ``example_great_expectations.py`` but on Spark: the ``greatExpectations``
profiler validates the Spark frame at each checkpoint (here via ``force_profile``) and the
report shows which step breaks the data contract — with critical failures hard (red) and
warnings advisory (amber).

How GX runs on Spark here: the profiler **samples** the frame first (``backend.sample`` ->
``limit``) and bridges that sample to pandas (``toPandas``), then runs GX's pandas engine.
So validation is over a bounded sample collected to the driver, **not** the full
distributed dataset (GX's own Spark engine is not used). Keep the sample bounded on large
data, or pass ``sample=None`` for full validation (which collects the whole frame — costly).

``great-expectations`` is optional: without it the profiler degrades to a status note.

Run:  python examples/example_great_expectations_spark.py
Then open great_expectations_spark_report.html.
"""

import os
import tempfile

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import conformare as cf

try:
    import great_expectations as gx

    GXE = gx.expectations
    HAVE_GX = True
except Exception:
    HAVE_GX = False

UK_REGIONS = ["London", "Manchester", "Leeds", "Bristol", "Glasgow"]
ALL_REGIONS = UK_REGIONS + ["Dublin", "Paris"]  # non-UK rows in the raw feed


def _checks():
    """The same expectations, as native GX objects when available, else portable dicts."""
    if HAVE_GX:
        return [
            GXE.ExpectColumnValuesToBeBetween(
                column="age", min_value=18, max_value=120, severity="critical"
            ),
            GXE.ExpectColumnValuesToBeInSet(
                column="region", value_set=UK_REGIONS, severity="warning"
            ),
            GXE.ExpectColumnValuesToNotBeNull(column="email"),
        ]
    return [
        {
            "expectation_type": "expect_column_values_to_be_between",
            "kwargs": {"column": "age", "min_value": 18, "max_value": 120, "severity": "critical"},
        },
        {
            "expectation_type": "expect_column_values_to_be_in_set",
            "kwargs": {"column": "region", "value_set": UK_REGIONS, "severity": "warning"},
        },
        {"expectation_type": "expect_column_values_to_not_be_null", "kwargs": {"column": "email"}},
    ]


def _write_customers(path, n=300):
    pd.DataFrame(
        {
            "customer_id": range(n),
            "full_name": [f"Customer {i}" for i in range(n)],
            "email": [f"user{i}@example.com" for i in range(n)],
            "region": [ALL_REGIONS[i % len(ALL_REGIONS)] for i in range(n)],
            "age": [16 + (i * 7) % 50 for i in range(n)],  # some under 18
        }
    ).to_csv(path, index=False)


def main(out=None):
    out = out or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
        "output",
        "great_expectations_spark_report.html",
    )
    os.makedirs(os.path.dirname(out), exist_ok=True)
    spark = (
        SparkSession.builder.master("local[1]")
        .appName("conformare-gx-spark")
        .config("spark.ui.enabled", "false")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("ERROR")

    tmp = tempfile.mkdtemp(prefix="ft_gx_spark_")
    _write_customers(os.path.join(tmp, "customers.csv"))

    cf.trackSpark()
    cf.set_profiles({"*": [cf.rowCount, cf.dataSize]})
    cf.describe_process(
        "PySpark customer ingest with Great Expectations checkpoints at the raw and "
        "cleaned stages — GX validates a sample of each Spark frame (bridged to pandas) "
        "and the report flags where an expectation first fails."
    )

    GX_RAW = cf.greatExpectations(*_checks(), hard_severities=("critical", "error"))
    GX_CLEAN = cf.greatExpectations(*_checks(), hard_severities=("critical", "error"))

    customers = (
        spark.read.option("header", True)
        .option("inferSchema", True)
        .csv(os.path.join(tmp, "customers.csv"))
    )  # hooked -> source "customers"

    with (
        cf.describe("Raw screen", purpose="Validate the incoming customer feed"),
        cf.force_profile(GX_RAW, only="last", cache=True),
    ):
        screened = customers.select("customer_id", "full_name", "email", "region", "age")

    with (
        cf.describe("Clean customers", purpose="Adults (18+) resident in the UK"),
        cf.force_profile(GX_CLEAN, only="last", cache=True),
    ):
        uk_adults = screened.filter(F.col("age") >= 18).filter(F.col("region").isin(UK_REGIONS))

    uk_adults.write.mode("overwrite").parquet(os.path.join(tmp, "uk_adults"))

    html = cf.to_html(
        path=out, title="Great Expectations checkpoints (PySpark) — conformare report"
    )
    m = cf.build_model(cf.store)
    ex = m["stats"]["expectations"]
    print(
        f"wrote {out} ({len(html):,} bytes)  [great-expectations {'installed' if HAVE_GX else 'NOT installed'}]"
    )
    print(f"  evaluated={ex['evaluated']}, failed={ex['failed']}, hard={ex['failed_hard']}")
    for n in m["nodes"]:
        e = n.get("expectations")
        if not (e and e.get("status") == "ok"):
            continue
        for r in e["results"]:
            mark = "PASS" if r["success"] else ("HARD" if r.get("hard") else "warn")
            print(f"    [{mark}] {n['label']}: {r['expectation']} (severity={r.get('severity')})")
    cf.restore()  # unpatch + release cached frames
    spark.stop()
    _ = uk_adults
    return out


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.