Native pandas tracking
Idiomatic pandas (df[df.col==1], merge, groupby) tracked in place.
Example code
Source: examples/example_pandas.py
"""Native-pandas pipeline tracked with ``trackPandas()``.
Demonstrates that idiomatic pandas -- boolean indexing (``df[df.col == 1]``), column
projection (``df[[...]]``), ``query``, ``merge`` and ``groupby.agg`` -- is tracked with
the authored predicate captured from source, without rewriting the code in a method API.
Run: python examples/example_pandas.py
Then open pandas_report.html.
"""
import os
import tempfile
import pandas as pd
import conformare as cf
UK_REGIONS = ["London", "Manchester", "Leeds", "Bristol", "Glasgow"]
ALL_REGIONS = UK_REGIONS + ["Dublin", "Paris"]
def write_inputs(folder, n_customers=300, n_streams=900):
pd.DataFrame(
{
"customer_id": range(n_customers),
"full_name": [f"Customer {i}" for i in range(n_customers)],
"email": [f"user{i}@example.com" for i in range(n_customers)],
"region": [ALL_REGIONS[i % len(ALL_REGIONS)] for i in range(n_customers)],
"age": [16 + (i * 7) % 50 for i in range(n_customers)],
}
).to_csv(os.path.join(folder, "customers.csv"), index=False)
pd.DataFrame(
{
"customer_id": [i % n_customers for i in range(n_streams)],
"movie_id": [i % 25 for i in range(n_streams)],
"watch_minutes": [float((i * 13) % 200) for i in range(n_streams)],
}
).to_csv(os.path.join(folder, "streams.csv"), index=False)
def main(out=None):
out = out or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output", "pandas_report.html"
)
os.makedirs(os.path.dirname(out), exist_ok=True)
tmp = tempfile.mkdtemp(prefix="ft_pandas_")
write_inputs(tmp)
cf.trackPandas()
cf.set_profiles(
{
"*": [
cf.rowCount,
cf.dataSize,
cf.nullFraction(columns="all"),
cf.histogram(columns=["age", "watch_minutes", "avg_watch_minutes"]),
cf.iqrOutliers(columns=["age", "watch_minutes", "avg_watch_minutes"]),
]
}
)
cf.profile_sources(True)
cf.mark_sensitive("region", tag="location", category="Location", severity="low")
cf.describe_process(
"Native-pandas streaming analytics: clean UK adult customers and report watch "
"time by region, written in idiomatic pandas (boolean indexing, projection, "
"merge, groupby)."
)
customers = pd.read_csv(os.path.join(tmp, "customers.csv")) # source "customers"
streams = pd.read_csv(os.path.join(tmp, "streams.csv")) # source "streams"
with cf.describe(
"Clean customers",
purpose="Adults (18+) resident in the UK",
definition_owner="data-governance",
risks=cf.risk(
"privacy.pii_exposure",
"compliance.gdpr",
note="name + email present",
mitigation="Drop email before export",
owner="data-governance",
),
):
adults = customers[customers.age >= 18] # boolean indexing -> filter
uk_adults = adults[adults.region.isin(UK_REGIONS)] # boolean indexing -> filter
uk_adults = uk_adults[["customer_id", "full_name", "region", "age"]] # projection -> select
with cf.describe(
"Watch time by region",
purpose="Average watch minutes per UK region",
definition_owner="analytics-product",
):
joined = uk_adults.merge(streams, on="customer_id", how="inner") # merge
by_region = joined.groupby("region", as_index=False).agg(
avg_watch_minutes=("watch_minutes", "mean")
) # group_by.agg
with cf.describe(
"Heavy watchers", purpose="Sessions over 100 minutes", definition_owner="analytics-product"
):
heavy = joined.query("watch_minutes > 100") # query -> filter
by_region.to_csv(os.path.join(tmp, "watch_by_region.csv"), index=False) # sink
html = cf.to_html(path=out, title="Native pandas pipeline — conformare report")
m = cf.build_model(cf.store)
print(f"wrote {out} ({len(html):,} bytes)")
print(
f" nodes={m['stats']['nodes']}, columns={m['stats']['columns']}, "
f"risks={m['stats']['risks']}, contexts={len(m['groups'])}"
)
cf.restore()
_ = (by_region, heavy)
return out
if __name__ == "__main__":
main()