Native pandas tracking

Idiomatic pandas (df[df.col==1], merge, groupby) tracked in place.

Open the full report ↗

Example code

Source: examples/example_pandas.py

"""Native-pandas pipeline tracked with ``trackPandas()``.

Demonstrates that idiomatic pandas -- boolean indexing (``df[df.col == 1]``), column
projection (``df[[...]]``), ``query``, ``merge`` and ``groupby.agg`` -- is tracked with
the authored predicate captured from source, without rewriting the code in a method API.

Run:  python examples/example_pandas.py
Then open pandas_report.html.
"""

import os
import tempfile

import pandas as pd

import conformare as cf

UK_REGIONS = ["London", "Manchester", "Leeds", "Bristol", "Glasgow"]
ALL_REGIONS = UK_REGIONS + ["Dublin", "Paris"]


def write_inputs(folder, n_customers=300, n_streams=900):
    pd.DataFrame(
        {
            "customer_id": range(n_customers),
            "full_name": [f"Customer {i}" for i in range(n_customers)],
            "email": [f"user{i}@example.com" for i in range(n_customers)],
            "region": [ALL_REGIONS[i % len(ALL_REGIONS)] for i in range(n_customers)],
            "age": [16 + (i * 7) % 50 for i in range(n_customers)],
        }
    ).to_csv(os.path.join(folder, "customers.csv"), index=False)
    pd.DataFrame(
        {
            "customer_id": [i % n_customers for i in range(n_streams)],
            "movie_id": [i % 25 for i in range(n_streams)],
            "watch_minutes": [float((i * 13) % 200) for i in range(n_streams)],
        }
    ).to_csv(os.path.join(folder, "streams.csv"), index=False)


def main(out=None):
    out = out or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output", "pandas_report.html"
    )
    os.makedirs(os.path.dirname(out), exist_ok=True)
    tmp = tempfile.mkdtemp(prefix="ft_pandas_")
    write_inputs(tmp)

    cf.trackPandas()
    cf.set_profiles(
        {
            "*": [
                cf.rowCount,
                cf.dataSize,
                cf.nullFraction(columns="all"),
                cf.histogram(columns=["age", "watch_minutes", "avg_watch_minutes"]),
                cf.iqrOutliers(columns=["age", "watch_minutes", "avg_watch_minutes"]),
            ]
        }
    )
    cf.profile_sources(True)
    cf.mark_sensitive("region", tag="location", category="Location", severity="low")
    cf.describe_process(
        "Native-pandas streaming analytics: clean UK adult customers and report watch "
        "time by region, written in idiomatic pandas (boolean indexing, projection, "
        "merge, groupby)."
    )

    customers = pd.read_csv(os.path.join(tmp, "customers.csv"))  # source "customers"
    streams = pd.read_csv(os.path.join(tmp, "streams.csv"))  # source "streams"

    with cf.describe(
        "Clean customers",
        purpose="Adults (18+) resident in the UK",
        definition_owner="data-governance",
        risks=cf.risk(
            "privacy.pii_exposure",
            "compliance.gdpr",
            note="name + email present",
            mitigation="Drop email before export",
            owner="data-governance",
        ),
    ):
        adults = customers[customers.age >= 18]  # boolean indexing -> filter
        uk_adults = adults[adults.region.isin(UK_REGIONS)]  # boolean indexing -> filter
        uk_adults = uk_adults[["customer_id", "full_name", "region", "age"]]  # projection -> select

    with cf.describe(
        "Watch time by region",
        purpose="Average watch minutes per UK region",
        definition_owner="analytics-product",
    ):
        joined = uk_adults.merge(streams, on="customer_id", how="inner")  # merge
        by_region = joined.groupby("region", as_index=False).agg(
            avg_watch_minutes=("watch_minutes", "mean")
        )  # group_by.agg

    with cf.describe(
        "Heavy watchers", purpose="Sessions over 100 minutes", definition_owner="analytics-product"
    ):
        heavy = joined.query("watch_minutes > 100")  # query -> filter

    by_region.to_csv(os.path.join(tmp, "watch_by_region.csv"), index=False)  # sink

    html = cf.to_html(path=out, title="Native pandas pipeline — conformare report")
    m = cf.build_model(cf.store)
    print(f"wrote {out} ({len(html):,} bytes)")
    print(
        f"  nodes={m['stats']['nodes']}, columns={m['stats']['columns']}, "
        f"risks={m['stats']['risks']}, contexts={len(m['groups'])}"
    )
    cf.restore()
    _ = (by_region, heavy)
    return out


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.