Customer pipeline (Narwhals)

A Narwhals pipeline with describe()/risk() governance, PII sensitivity tagging, and source & sink capture.

Open the full report ↗

Example code

Source: examples/example_html_report.py

"""End-to-end: a richer pipeline with describe()/risk()/sensitivity, exported to
an interactive HTML report.

Run:  python examples/example_html_report.py
Then open report.html in a browser.
"""

import os

import narwhals as nw
import pandas as pd

import conformare as cf


def drop_minors(df):
    return df.filter(nw.col("age") >= 18)


def standardise(df):
    """Calls drop_minors -> nested function path standardise().drop_minors()."""
    adults = drop_minors(df)
    return adults.with_columns(decade=(nw.col("age") // 10) * 10)


def main(out=None):
    out = out or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output", "report.html"
    )
    os.makedirs(os.path.dirname(out), exist_ok=True)
    cf.trackNarwhals()
    cf.track_functions(True)
    # Profile distributions at every step so a column can be "followed" through the
    # pipeline in the report's distribution follower.
    cf.set_profiles(
        {
            "*": [
                cf.rowCount,
                cf.dataSize,
                cf.histogram(columns="all"),
                cf.nullFraction(columns="all"),
            ],
        }
    )

    # Seed two CSVs, then read them back -- the read captures the load location
    # onto the source nodes.
    import tempfile

    tmp = tempfile.mkdtemp(prefix="conformare_")
    cust_path = os.path.join(tmp, "customers.csv")
    ord_path = os.path.join(tmp, "orders.csv")
    pd.DataFrame(
        {
            "customer_id": [1, 2, 3, 4, 5, 6],
            "full_name": ["Ana", "Ben", "Cara", "Dan", "Eve", "Finn"],
            "email": ["a@x.com", "b@x.com", "c@x.com", "d@x.com", "e@x.com", "f@x.com"],
            "date_of_birth": ["1990-01-01"] * 6,
            "age": [34, 22, 41, 29, 38, 17],
            "country": ["UK", "UK", "IE", "UK", "IE", "UK"],
        }
    ).to_csv(cust_path, index=False)
    pd.DataFrame(
        {
            "customer_id": [1, 2, 3, 4, 5],
            "amount": [100.0, 50.0, 75.0, 200.0, 30.0],
        }
    ).to_csv(ord_path, index=False)

    customers = nw.from_native(pd.read_csv(cust_path))  # source: customers.csv
    orders = nw.from_native(pd.read_csv(ord_path))  # source: orders.csv

    # A manual sensitivity assertion the heuristics wouldn't catch on name alone.
    cf.mark_sensitive("country", tag="location", category="Location", severity="medium")

    with cf.describe(
        "Clean customer data",
        purpose="Keep adults and standardise the customer table",
        risks=cf.risk(
            "privacy.pii_exposure",
            "compliance.gdpr",
            note="email + DOB retained through the pipeline",
            mitigation="Mask email and DOB before any export",
            owner="data-platform",
        ),
    ):  # owned -> low governance
        adults = standardise(customers)  # nested fn path: standardise().drop_minors()

    with cf.describe(
        "Enrich with orders",
        purpose="Attach order totals per customer",
        risks=cf.risk(
            "ops.expensive_action",
            note="join shuffles both sides",
            mitigation="Broadcast the small orders table",
        ),
    ):  # unowned -> medium
        enriched = adults.join(orders, on="customer_id", how="left")

    with cf.describe("Aggregate", purpose="Spend by country"):
        by_country = enriched.group_by("country").agg(nw.col("amount").sum())

    # A chained expression with no intermediate assignments -- the filter/with_columns
    # /select intermediates are anonymous, so they roll up into a single `report` node.
    with cf.describe(
        "Final report",
        purpose="Flag and trim high-value customers",
        risks="fairness.proxy_variable",
    ):  # no mitigation -> high governance
        report = (
            enriched.filter(nw.col("amount") > 40)
            .with_columns(high_value=nw.col("amount") > 100)
            .select("customer_id", "amount", "high_value")
        )

    # Write the result -- captured as a sink node with its location.
    report.write_csv(os.path.join(tmp, "high_value_customers.csv"))

    cf.track_functions(False)
    html = cf.to_html(path=out, title="Customer pipeline — conformare report")
    print(f"wrote {out} ({len(html):,} bytes)")
    print(
        f"  nodes={len(cf.build_model(cf.store)['nodes'])}, "
        f"columns={len(cf.store.all_columns())}, events={len(cf.lineage())}"
    )
    return by_country


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.