Experimental: tracking Series & in-place assignment
The opt-in trackPandasSeries() and trackPandas__setitem__() switches: a group-wise Series aggregation rejoins a frame and is tracked, and in-place df[“x”] = … writes version the frame. Shows what the experimental opt-ins capture (and why the DataFrame-first style is the default recommendation).
Example code
Source: examples/example_pandas_experimental.py
"""Experimental pandas tracking: ``trackPandasSeries()`` and ``trackPandas__setitem__()``.
By default conformare tracks only the DataFrame-returning subset of pandas, so two common
idioms drop out of the lineage:
* a value that passes through a ``pandas.Series`` -- e.g. a group-wise aggregate
(``df.groupby(col)[col2].nunique()``) that is then joined back on, and
* in-place column assignment, ``df["new"] = ...``, which mutates the frame and returns
``None``.
Two **experimental, opt-in** switches extend tracking to cover them. Both warn on enable
and link to the Pandas best-practices page, because the cleaner fix is usually to keep
data in DataFrames (``groupby(..., as_index=False).agg(...)``) and prefer ``assign`` --
which are tracked with no opt-in at all. This example exists to show what the opt-ins
capture for pipelines you cannot (yet) rewrite that way.
Run: python examples/example_pandas_experimental.py
Then open output/pandas_experimental_report.html.
"""
import os
import warnings
import pandas as pd
import conformare as cf
def _make_pdf(n: int = 240) -> pd.DataFrame:
regions = ["England", "Scotland", "Wales", "Northern Ireland"]
rows = [
(i, regions[i % len(regions)], f"prod_{i % 7}", float((i * 13) % 300), 1 + (i % 9))
for i in range(n)
]
return pd.DataFrame(rows, columns=["customer_id", "region", "product", "spend", "tenure"])
def main(out=None):
out = out or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"output",
"pandas_experimental_report.html",
)
os.makedirs(os.path.dirname(out), exist_ok=True)
cf.trackPandas()
# Opt in to the experimental behaviour. We silence the (intentional) experimental
# warnings here because this example is *about* them; in your own code, read the
# warning -- it links to https://kaelonlloyd.github.io/conformare-docs/pandas.html.
with warnings.catch_warnings():
warnings.simplefilter("ignore", cf.ConformareExperimentalWarning)
cf.trackPandasSeries() # record pandas.Series as lineage nodes
cf.trackPandas__setitem__() # track df["x"] = ... by versioning the frame
cf.set_profiles({"*": [cf.rowCount]})
cf.describe_process(
"Pandas pipeline that leans on Series aggregations and in-place column writes, "
"tracked end to end via the experimental opt-ins."
)
df = _make_pdf()
# 1) Series tracking: a group-wise aggregate comes back as a Series, rejoins a frame
# via reset_index, and is merged back. Without trackPandasSeries the Series (and so
# this whole branch) would be invisible and the merge parent would be an orphan.
with cf.describe("Products per region (Series aggregation)"):
products_per_region = df.groupby("region")["product"].nunique() # -> Series
region_lookup = products_per_region.reset_index(name="products_in_region") # -> frame
enriched = df.merge(region_lookup, on="region", how="left")
# 2) __setitem__ tracking: in-place column creation. Each write re-versions the frame
# (old -> new edge) and captures its column <- expression. Consecutive writes on the
# same frame form one chain, so "Compress chained operations" rolls them into a
# single node in the diagram.
with cf.describe("Derived columns (in-place assignment)"):
enriched["spend_per_tenure"] = enriched["spend"] / enriched["tenure"]
enriched["high_value"] = enriched["spend"] >= 150
# For contrast, the DataFrame-first equivalent of step 2 -- tracked with NO opt-in:
with cf.describe("Same thing, the recommended way (assign)"):
recommended = enriched.assign(
spend_per_tenure_v2=enriched["spend"] / enriched["tenure"],
high_value_v2=enriched["spend"] >= 150,
)
html = cf.to_html(out, title="Experimental pandas tracking (Series + __setitem__)")
# Confirm the Series branch and the in-place writes are part of one connected graph.
model = cf.build_model(cf.store)
adj = {}
for e in model["edges"]:
adj.setdefault(e["source"], []).append(e["target"])
sources = [
n["id"] for n in model["nodes"] if not any(e["target"] == n["id"] for e in model["edges"])
]
seen, stack = set(), list(sources)
while stack:
node = stack.pop()
if node not in seen:
seen.add(node)
stack += adj.get(node, [])
ops = [e.op for e in cf.lineage() if e.kind == "op"]
cf.restore()
print(f"wrote {out} ({len(html):,} bytes)")
print(f" ops : {ops}")
print(f" series nodes : {sum(1 for o in ops if o.startswith(('group_by.', 'series.')))}")
print(f" setitem nodes: {sum(1 for o in ops if o == 'setitem')}")
print(f" connected : {len(seen)}/{model['stats']['nodes']} nodes reachable from roots")
return out
if __name__ == "__main__":
main()