Great Expectations checkpoints

Per-expectation validation showing exactly where a data contract starts failing.

Open the full report ↗

Example code

Source: examples/example_great_expectations.py

"""Great Expectations checkpoints: validate the data at chosen points in the pipeline.

The ``greatExpectations`` profiler is, in effect, a checkpoint created dynamically at each
profile point. Attach it (here with ``force_profile``) to validate the frame there and see
which expectations passed or failed — pinpointing the step where the data stops meeting
its contract.

Two ways to specify expectations are shown:
  * **native GX objects** — ``gx.expectations.ExpectColumnValuesToBeBetween(...)`` — with
    code completion and type-checking (preferred), and
  * portable **dicts** — used automatically as a fallback when GX isn't installed.

Each expectation carries a ``severity``. By default every failure is *advisory* (amber);
``hard_severities=("critical", "error")`` escalates those to *hard* failures (red border,
fails the Expectations KPI). Here the age check is ``critical`` and the region check is a
``warning``, so on the raw feed the node shows a hard (red) failure plus an advisory one.

``great-expectations`` is optional: without it the profiler degrades to a status note.

Run:  pip install great-expectations   (optional)
      python examples/example_great_expectations.py
Then open great_expectations_report.html.
"""

import os
import tempfile

import narwhals as nw
import pandas as pd

import conformare as cf

try:
    import great_expectations as gx

    GXE = gx.expectations
    HAVE_GX = True
except Exception:
    HAVE_GX = False

UK_REGIONS = ["London", "Manchester", "Leeds", "Bristol", "Glasgow"]
ALL_REGIONS = UK_REGIONS + ["Dublin", "Paris"]  # non-UK rows present in the raw feed


def _checks():
    """The same expectations, as native GX objects when available, else portable dicts."""
    if HAVE_GX:
        return [
            GXE.ExpectColumnValuesToBeBetween(
                column="age", min_value=18, max_value=120, severity="critical"
            ),
            GXE.ExpectColumnValuesToBeInSet(
                column="region", value_set=UK_REGIONS, severity="warning"
            ),
            GXE.ExpectColumnValuesToNotBeNull(column="email"),
        ]
    return [
        {
            "expectation_type": "expect_column_values_to_be_between",
            "kwargs": {"column": "age", "min_value": 18, "max_value": 120, "severity": "critical"},
        },
        {
            "expectation_type": "expect_column_values_to_be_in_set",
            "kwargs": {"column": "region", "value_set": UK_REGIONS, "severity": "warning"},
        },
        {"expectation_type": "expect_column_values_to_not_be_null", "kwargs": {"column": "email"}},
    ]


def _data(n=240):
    return pd.DataFrame(
        {
            "customer_id": range(n),
            "full_name": [f"Customer {i}" for i in range(n)],
            "email": [f"user{i}@example.com" for i in range(n)],
            "region": [ALL_REGIONS[i % len(ALL_REGIONS)] for i in range(n)],
            "age": [16 + (i * 7) % 50 for i in range(n)],  # some under 18
        }
    )


def main(out=None):
    out = out or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
        "output",
        "great_expectations_report.html",
    )
    os.makedirs(os.path.dirname(out), exist_ok=True)
    cf.trackNarwhals()
    cf.set_profiles({"*": [cf.rowCount, cf.dataSize]})
    cf.describe_process(
        "Streaming customer ingest with Great Expectations checkpoints at the raw and "
        "cleaned stages — the report flags the step where an expectation first fails, "
        "with critical failures hard (red) and warnings advisory (amber)."
    )

    GX_RAW = cf.greatExpectations(*_checks(), hard_severities=("critical", "error"))
    GX_CLEAN = cf.greatExpectations(*_checks(), hard_severities=("critical", "error"))

    customers = nw.from_native(_data())

    with (
        cf.describe("Raw screen", purpose="Validate the incoming customer feed"),
        cf.force_profile(GX_RAW, only="last"),
    ):
        screened = customers.select("customer_id", "full_name", "email", "region", "age")

    with (
        cf.describe("Clean customers", purpose="Adults (18+) resident in the UK"),
        cf.force_profile(GX_CLEAN, only="last"),
    ):
        uk_adults = screened.filter(nw.col("age") >= 18).filter(nw.col("region").is_in(UK_REGIONS))

    tmp = tempfile.mkdtemp(prefix="ft_gx_")
    uk_adults.write_csv(os.path.join(tmp, "uk_adults.csv"))

    html = cf.to_html(path=out, title="Great Expectations checkpoints — conformare report")
    m = cf.build_model(cf.store)
    ex = m["stats"]["expectations"]
    print(
        f"wrote {out} ({len(html):,} bytes)  [great-expectations {'installed' if HAVE_GX else 'NOT installed'}]"
    )
    print(f"  evaluated={ex['evaluated']}, failed={ex['failed']}, hard={ex['failed_hard']}")
    for n in m["nodes"]:
        e = n.get("expectations")
        if not (e and e.get("status") == "ok"):
            continue
        for r in e["results"]:
            mark = "PASS" if r["success"] else ("HARD" if r.get("hard") else "warn")
            print(f"    [{mark}] {n['label']}: {r['expectation']} (severity={r.get('severity')})")
    cf.restore()
    _ = uk_adults
    return out


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.