Great Expectations checkpoints
Per-expectation validation showing exactly where a data contract starts failing.
Example code
Source: examples/example_great_expectations.py
"""Great Expectations checkpoints: validate the data at chosen points in the pipeline.
The ``greatExpectations`` profiler is, in effect, a checkpoint created dynamically at each
profile point. Attach it (here with ``force_profile``) to validate the frame there and see
which expectations passed or failed — pinpointing the step where the data stops meeting
its contract.
Two ways to specify expectations are shown:
* **native GX objects** — ``gx.expectations.ExpectColumnValuesToBeBetween(...)`` — with
code completion and type-checking (preferred), and
* portable **dicts** — used automatically as a fallback when GX isn't installed.
Each expectation carries a ``severity``. By default every failure is *advisory* (amber);
``hard_severities=("critical", "error")`` escalates those to *hard* failures (red border,
fails the Expectations KPI). Here the age check is ``critical`` and the region check is a
``warning``, so on the raw feed the node shows a hard (red) failure plus an advisory one.
``great-expectations`` is optional: without it the profiler degrades to a status note.
Run: pip install great-expectations (optional)
python examples/example_great_expectations.py
Then open great_expectations_report.html.
"""
import os
import tempfile
import narwhals as nw
import pandas as pd
import conformare as cf
try:
import great_expectations as gx
GXE = gx.expectations
HAVE_GX = True
except Exception:
HAVE_GX = False
UK_REGIONS = ["London", "Manchester", "Leeds", "Bristol", "Glasgow"]
ALL_REGIONS = UK_REGIONS + ["Dublin", "Paris"] # non-UK rows present in the raw feed
def _checks():
"""The same expectations, as native GX objects when available, else portable dicts."""
if HAVE_GX:
return [
GXE.ExpectColumnValuesToBeBetween(
column="age", min_value=18, max_value=120, severity="critical"
),
GXE.ExpectColumnValuesToBeInSet(
column="region", value_set=UK_REGIONS, severity="warning"
),
GXE.ExpectColumnValuesToNotBeNull(column="email"),
]
return [
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {"column": "age", "min_value": 18, "max_value": 120, "severity": "critical"},
},
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {"column": "region", "value_set": UK_REGIONS, "severity": "warning"},
},
{"expectation_type": "expect_column_values_to_not_be_null", "kwargs": {"column": "email"}},
]
def _data(n=240):
return pd.DataFrame(
{
"customer_id": range(n),
"full_name": [f"Customer {i}" for i in range(n)],
"email": [f"user{i}@example.com" for i in range(n)],
"region": [ALL_REGIONS[i % len(ALL_REGIONS)] for i in range(n)],
"age": [16 + (i * 7) % 50 for i in range(n)], # some under 18
}
)
def main(out=None):
out = out or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"output",
"great_expectations_report.html",
)
os.makedirs(os.path.dirname(out), exist_ok=True)
cf.trackNarwhals()
cf.set_profiles({"*": [cf.rowCount, cf.dataSize]})
cf.describe_process(
"Streaming customer ingest with Great Expectations checkpoints at the raw and "
"cleaned stages — the report flags the step where an expectation first fails, "
"with critical failures hard (red) and warnings advisory (amber)."
)
GX_RAW = cf.greatExpectations(*_checks(), hard_severities=("critical", "error"))
GX_CLEAN = cf.greatExpectations(*_checks(), hard_severities=("critical", "error"))
customers = nw.from_native(_data())
with (
cf.describe("Raw screen", purpose="Validate the incoming customer feed"),
cf.force_profile(GX_RAW, only="last"),
):
screened = customers.select("customer_id", "full_name", "email", "region", "age")
with (
cf.describe("Clean customers", purpose="Adults (18+) resident in the UK"),
cf.force_profile(GX_CLEAN, only="last"),
):
uk_adults = screened.filter(nw.col("age") >= 18).filter(nw.col("region").is_in(UK_REGIONS))
tmp = tempfile.mkdtemp(prefix="ft_gx_")
uk_adults.write_csv(os.path.join(tmp, "uk_adults.csv"))
html = cf.to_html(path=out, title="Great Expectations checkpoints — conformare report")
m = cf.build_model(cf.store)
ex = m["stats"]["expectations"]
print(
f"wrote {out} ({len(html):,} bytes) [great-expectations {'installed' if HAVE_GX else 'NOT installed'}]"
)
print(f" evaluated={ex['evaluated']}, failed={ex['failed']}, hard={ex['failed_hard']}")
for n in m["nodes"]:
e = n.get("expectations")
if not (e and e.get("status") == "ok"):
continue
for r in e["results"]:
mark = "PASS" if r["success"] else ("HARD" if r.get("hard") else "warn")
print(f" [{mark}] {n['label']}: {r['expectation']} (severity={r.get('severity')})")
cf.restore()
_ = uk_adults
return out
if __name__ == "__main__":
main()