Great Expectations on PySpark
Validation bridged from Spark to pandas via Narwhals sampling.
Example code
Source: examples/example_great_expectations_spark.py
"""Great Expectations checkpoints on a **PySpark** pipeline.
Same idea as ``example_great_expectations.py`` but on Spark: the ``greatExpectations``
profiler validates the Spark frame at each checkpoint (here via ``force_profile``) and the
report shows which step breaks the data contract — with critical failures hard (red) and
warnings advisory (amber).
How GX runs on Spark here: the profiler **samples** the frame first (``backend.sample`` ->
``limit``) and bridges that sample to pandas (``toPandas``), then runs GX's pandas engine.
So validation is over a bounded sample collected to the driver, **not** the full
distributed dataset (GX's own Spark engine is not used). Keep the sample bounded on large
data, or pass ``sample=None`` for full validation (which collects the whole frame — costly).
``great-expectations`` is optional: without it the profiler degrades to a status note.
Run: python examples/example_great_expectations_spark.py
Then open great_expectations_spark_report.html.
"""
import os
import tempfile
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import conformare as cf
try:
import great_expectations as gx
GXE = gx.expectations
HAVE_GX = True
except Exception:
HAVE_GX = False
UK_REGIONS = ["London", "Manchester", "Leeds", "Bristol", "Glasgow"]
ALL_REGIONS = UK_REGIONS + ["Dublin", "Paris"] # non-UK rows in the raw feed
def _checks():
"""The same expectations, as native GX objects when available, else portable dicts."""
if HAVE_GX:
return [
GXE.ExpectColumnValuesToBeBetween(
column="age", min_value=18, max_value=120, severity="critical"
),
GXE.ExpectColumnValuesToBeInSet(
column="region", value_set=UK_REGIONS, severity="warning"
),
GXE.ExpectColumnValuesToNotBeNull(column="email"),
]
return [
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {"column": "age", "min_value": 18, "max_value": 120, "severity": "critical"},
},
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {"column": "region", "value_set": UK_REGIONS, "severity": "warning"},
},
{"expectation_type": "expect_column_values_to_not_be_null", "kwargs": {"column": "email"}},
]
def _write_customers(path, n=300):
pd.DataFrame(
{
"customer_id": range(n),
"full_name": [f"Customer {i}" for i in range(n)],
"email": [f"user{i}@example.com" for i in range(n)],
"region": [ALL_REGIONS[i % len(ALL_REGIONS)] for i in range(n)],
"age": [16 + (i * 7) % 50 for i in range(n)], # some under 18
}
).to_csv(path, index=False)
def main(out=None):
out = out or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"output",
"great_expectations_spark_report.html",
)
os.makedirs(os.path.dirname(out), exist_ok=True)
spark = (
SparkSession.builder.master("local[1]")
.appName("conformare-gx-spark")
.config("spark.ui.enabled", "false")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
tmp = tempfile.mkdtemp(prefix="ft_gx_spark_")
_write_customers(os.path.join(tmp, "customers.csv"))
cf.trackSpark()
cf.set_profiles({"*": [cf.rowCount, cf.dataSize]})
cf.describe_process(
"PySpark customer ingest with Great Expectations checkpoints at the raw and "
"cleaned stages — GX validates a sample of each Spark frame (bridged to pandas) "
"and the report flags where an expectation first fails."
)
GX_RAW = cf.greatExpectations(*_checks(), hard_severities=("critical", "error"))
GX_CLEAN = cf.greatExpectations(*_checks(), hard_severities=("critical", "error"))
customers = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv(os.path.join(tmp, "customers.csv"))
) # hooked -> source "customers"
with (
cf.describe("Raw screen", purpose="Validate the incoming customer feed"),
cf.force_profile(GX_RAW, only="last", cache=True),
):
screened = customers.select("customer_id", "full_name", "email", "region", "age")
with (
cf.describe("Clean customers", purpose="Adults (18+) resident in the UK"),
cf.force_profile(GX_CLEAN, only="last", cache=True),
):
uk_adults = screened.filter(F.col("age") >= 18).filter(F.col("region").isin(UK_REGIONS))
uk_adults.write.mode("overwrite").parquet(os.path.join(tmp, "uk_adults"))
html = cf.to_html(
path=out, title="Great Expectations checkpoints (PySpark) — conformare report"
)
m = cf.build_model(cf.store)
ex = m["stats"]["expectations"]
print(
f"wrote {out} ({len(html):,} bytes) [great-expectations {'installed' if HAVE_GX else 'NOT installed'}]"
)
print(f" evaluated={ex['evaluated']}, failed={ex['failed']}, hard={ex['failed_hard']}")
for n in m["nodes"]:
e = n.get("expectations")
if not (e and e.get("status") == "ok"):
continue
for r in e["results"]:
mark = "PASS" if r["success"] else ("HARD" if r.get("hard") else "warn")
print(f" [{mark}] {n['label']}: {r['expectation']} (severity={r.get('severity')})")
cf.restore() # unpatch + release cached frames
spark.stop()
_ = uk_adults
return out
if __name__ == "__main__":
main()