Customer pipeline (Narwhals)
A Narwhals pipeline with describe()/risk() governance, PII sensitivity tagging, and source & sink capture.
Example code
Source: examples/example_html_report.py
"""End-to-end: a richer pipeline with describe()/risk()/sensitivity, exported to
an interactive HTML report.
Run: python examples/example_html_report.py
Then open report.html in a browser.
"""
import os
import narwhals as nw
import pandas as pd
import conformare as cf
def drop_minors(df):
return df.filter(nw.col("age") >= 18)
def standardise(df):
"""Calls drop_minors -> nested function path standardise().drop_minors()."""
adults = drop_minors(df)
return adults.with_columns(decade=(nw.col("age") // 10) * 10)
def main(out=None):
out = out or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output", "report.html"
)
os.makedirs(os.path.dirname(out), exist_ok=True)
cf.trackNarwhals()
cf.track_functions(True)
# Profile distributions at every step so a column can be "followed" through the
# pipeline in the report's distribution follower.
cf.set_profiles(
{
"*": [
cf.rowCount,
cf.dataSize,
cf.histogram(columns="all"),
cf.nullFraction(columns="all"),
],
}
)
# Seed two CSVs, then read them back -- the read captures the load location
# onto the source nodes.
import tempfile
tmp = tempfile.mkdtemp(prefix="conformare_")
cust_path = os.path.join(tmp, "customers.csv")
ord_path = os.path.join(tmp, "orders.csv")
pd.DataFrame(
{
"customer_id": [1, 2, 3, 4, 5, 6],
"full_name": ["Ana", "Ben", "Cara", "Dan", "Eve", "Finn"],
"email": ["a@x.com", "b@x.com", "c@x.com", "d@x.com", "e@x.com", "f@x.com"],
"date_of_birth": ["1990-01-01"] * 6,
"age": [34, 22, 41, 29, 38, 17],
"country": ["UK", "UK", "IE", "UK", "IE", "UK"],
}
).to_csv(cust_path, index=False)
pd.DataFrame(
{
"customer_id": [1, 2, 3, 4, 5],
"amount": [100.0, 50.0, 75.0, 200.0, 30.0],
}
).to_csv(ord_path, index=False)
customers = nw.from_native(pd.read_csv(cust_path)) # source: customers.csv
orders = nw.from_native(pd.read_csv(ord_path)) # source: orders.csv
# A manual sensitivity assertion the heuristics wouldn't catch on name alone.
cf.mark_sensitive("country", tag="location", category="Location", severity="medium")
with cf.describe(
"Clean customer data",
purpose="Keep adults and standardise the customer table",
risks=cf.risk(
"privacy.pii_exposure",
"compliance.gdpr",
note="email + DOB retained through the pipeline",
mitigation="Mask email and DOB before any export",
owner="data-platform",
),
): # owned -> low governance
adults = standardise(customers) # nested fn path: standardise().drop_minors()
with cf.describe(
"Enrich with orders",
purpose="Attach order totals per customer",
risks=cf.risk(
"ops.expensive_action",
note="join shuffles both sides",
mitigation="Broadcast the small orders table",
),
): # unowned -> medium
enriched = adults.join(orders, on="customer_id", how="left")
with cf.describe("Aggregate", purpose="Spend by country"):
by_country = enriched.group_by("country").agg(nw.col("amount").sum())
# A chained expression with no intermediate assignments -- the filter/with_columns
# /select intermediates are anonymous, so they roll up into a single `report` node.
with cf.describe(
"Final report",
purpose="Flag and trim high-value customers",
risks="fairness.proxy_variable",
): # no mitigation -> high governance
report = (
enriched.filter(nw.col("amount") > 40)
.with_columns(high_value=nw.col("amount") > 100)
.select("customer_id", "amount", "high_value")
)
# Write the result -- captured as a sink node with its location.
report.write_csv(os.path.join(tmp, "high_value_customers.csv"))
cf.track_functions(False)
html = cf.to_html(path=out, title="Customer pipeline — conformare report")
print(f"wrote {out} ({len(html):,} bytes)")
print(
f" nodes={len(cf.build_model(cf.store)['nodes'])}, "
f"columns={len(cf.store.all_columns())}, events={len(cf.lineage())}"
)
return by_country
if __name__ == "__main__":
main()