Experimental: tracking Series & in-place assignment

The opt-in trackPandasSeries() and trackPandas__setitem__() switches: a group-wise Series aggregation rejoins a frame and is tracked, and in-place df[“x”] = … writes version the frame. Shows what the experimental opt-ins capture (and why the DataFrame-first style is the default recommendation).

Open the full report ↗

Example code

Source: examples/example_pandas_experimental.py

"""Experimental pandas tracking: ``trackPandasSeries()`` and ``trackPandas__setitem__()``.

By default conformare tracks only the DataFrame-returning subset of pandas, so two common
idioms drop out of the lineage:

* a value that passes through a ``pandas.Series`` -- e.g. a group-wise aggregate
  (``df.groupby(col)[col2].nunique()``) that is then joined back on, and
* in-place column assignment, ``df["new"] = ...``, which mutates the frame and returns
  ``None``.

Two **experimental, opt-in** switches extend tracking to cover them. Both warn on enable
and link to the Pandas best-practices page, because the cleaner fix is usually to keep
data in DataFrames (``groupby(..., as_index=False).agg(...)``) and prefer ``assign`` --
which are tracked with no opt-in at all. This example exists to show what the opt-ins
capture for pipelines you cannot (yet) rewrite that way.

Run:  python examples/example_pandas_experimental.py
Then open output/pandas_experimental_report.html.
"""

import os
import warnings

import pandas as pd

import conformare as cf


def _make_pdf(n: int = 240) -> pd.DataFrame:
    regions = ["England", "Scotland", "Wales", "Northern Ireland"]
    rows = [
        (i, regions[i % len(regions)], f"prod_{i % 7}", float((i * 13) % 300), 1 + (i % 9))
        for i in range(n)
    ]
    return pd.DataFrame(rows, columns=["customer_id", "region", "product", "spend", "tenure"])


def main(out=None):
    out = out or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
        "output",
        "pandas_experimental_report.html",
    )
    os.makedirs(os.path.dirname(out), exist_ok=True)

    cf.trackPandas()
    # Opt in to the experimental behaviour. We silence the (intentional) experimental
    # warnings here because this example is *about* them; in your own code, read the
    # warning -- it links to https://kaelonlloyd.github.io/conformare-docs/pandas.html.
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", cf.ConformareExperimentalWarning)
        cf.trackPandasSeries()  # record pandas.Series as lineage nodes
        cf.trackPandas__setitem__()  # track df["x"] = ... by versioning the frame

    cf.set_profiles({"*": [cf.rowCount]})
    cf.describe_process(
        "Pandas pipeline that leans on Series aggregations and in-place column writes, "
        "tracked end to end via the experimental opt-ins."
    )

    df = _make_pdf()

    # 1) Series tracking: a group-wise aggregate comes back as a Series, rejoins a frame
    #    via reset_index, and is merged back. Without trackPandasSeries the Series (and so
    #    this whole branch) would be invisible and the merge parent would be an orphan.
    with cf.describe("Products per region (Series aggregation)"):
        products_per_region = df.groupby("region")["product"].nunique()  # -> Series
        region_lookup = products_per_region.reset_index(name="products_in_region")  # -> frame
        enriched = df.merge(region_lookup, on="region", how="left")

    # 2) __setitem__ tracking: in-place column creation. Each write re-versions the frame
    #    (old -> new edge) and captures its column <- expression. Consecutive writes on the
    #    same frame form one chain, so "Compress chained operations" rolls them into a
    #    single node in the diagram.
    with cf.describe("Derived columns (in-place assignment)"):
        enriched["spend_per_tenure"] = enriched["spend"] / enriched["tenure"]
        enriched["high_value"] = enriched["spend"] >= 150

    # For contrast, the DataFrame-first equivalent of step 2 -- tracked with NO opt-in:
    with cf.describe("Same thing, the recommended way (assign)"):
        recommended = enriched.assign(
            spend_per_tenure_v2=enriched["spend"] / enriched["tenure"],
            high_value_v2=enriched["spend"] >= 150,
        )

    html = cf.to_html(out, title="Experimental pandas tracking (Series + __setitem__)")

    # Confirm the Series branch and the in-place writes are part of one connected graph.
    model = cf.build_model(cf.store)
    adj = {}
    for e in model["edges"]:
        adj.setdefault(e["source"], []).append(e["target"])
    sources = [
        n["id"] for n in model["nodes"] if not any(e["target"] == n["id"] for e in model["edges"])
    ]
    seen, stack = set(), list(sources)
    while stack:
        node = stack.pop()
        if node not in seen:
            seen.add(node)
            stack += adj.get(node, [])

    ops = [e.op for e in cf.lineage() if e.kind == "op"]
    cf.restore()
    print(f"wrote {out} ({len(html):,} bytes)")
    print(f"  ops          : {ops}")
    print(f"  series nodes : {sum(1 for o in ops if o.startswith(('group_by.', 'series.')))}")
    print(f"  setitem nodes: {sum(1 for o in ops if o == 'setitem')}")
    print(f"  connected    : {len(seen)}/{model['stats']['nodes']} nodes reachable from roots")
    return out


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.