Fleet: recording runs

Record runs to a central store with cf.record_run, then roll them up. Two runs where the implementation changes but the risk does not – the fleet dashboard flags it as a review needed.

Open the full report ↗

Example code

Source: examples/example_fleet_recording.py

"""Fleet: record runs to a central store and surface implementation-vs-risk drift.

Recording is **explicit** -- call ``cf.record_run(pipeline=...)`` to append an immutable
run record to a configured store. ``pipeline`` is the identity across runs; ``env`` /
``git_sha`` / ``tags`` are attributes. This example uses the portable JSON store so it runs
anywhere; in a Spark-only environment (no blob access) configure the Spark-table writer
instead -- nothing else changes::

    cf.configure_store(writer="spark_table", spark=spark,
                       catalog="gov", schema="conformare")

It records two runs of the same pipeline+context where the **implementation changes but the
risk assessment does not** -- exactly the stale-governance smell the review report flags.

Run:  python examples/example_fleet_recording.py
"""

import os
import shutil

import pandas as pd

import conformare as cf

PIPELINE = "customer_scoring"


def _run_pipeline(op: str):
    """One run of a tiny pipeline with a described, risk-bearing context."""
    cf.restore()
    cf.store.clear()
    cf.reset_context()
    cf.trackPandas()
    cf.set_profiles({})
    df = pd.DataFrame(
        {"region": ["UK", "US", "UK"], "spend": [120.0, 80.0, 200.0], "tenure": [3, 2, 5]}
    )
    with (
        cf.describe("Engineer features", purpose="Per-customer derived columns"),
        cf.risk("pii-in-features", severity="high", mitigation="reviewed by DPO", owner="Alice"),
    ):
        if op == "div":
            df.assign(value=df["spend"] / df["tenure"])
        else:  # implementation changed -- but the risk above is unchanged
            df.assign(value=df["spend"] / df["tenure"] * 1.2)


def main(out_dir=None):
    out_dir = out_dir or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output"
    )
    store_dir = os.path.join(out_dir, "fleet_store")
    if os.path.exists(store_dir):
        shutil.rmtree(store_dir)
    dest = {"writer": "json", "path": store_dir}

    # Two runs over time: same pipeline + context, implementation changes, risk does not.
    _run_pipeline("div")
    cf.record_run(
        PIPELINE,
        dest=dest,
        env="prod",
        git_sha="aaaa111",
        run_id="run-1",
        ts="2026-06-01T09:00:00Z",
    )
    _run_pipeline("mul")
    cf.record_run(
        PIPELINE,
        dest=dest,
        env="prod",
        git_sha="bbbb222",
        run_id="run-2",
        ts="2026-06-08T09:00:00Z",
    )
    cf.restore()

    # Roll the fleet store up and answer the governance questions.
    tables = cf.fleet.read_store(dest)
    review = cf.fleet.review_report(tables)
    healthy = cf.fleet.last_healthy_run(tables, PIPELINE)
    owners = cf.fleet.owner_pipelines(tables)

    print(f"recorded {len(tables['runs'])} runs to {store_dir}")
    print(f"  statuses     : {[(r['run_id'], r['status']) for r in tables['runs']]}")
    print(f"  last healthy : {healthy['run_id'] if healthy else 'none'}")
    print(f"  owners       : {[(o['owner'], o['pipelines']) for o in owners]}")
    print(
        f"  REVIEW NEEDED: {len(review)} context(s) where implementation changed but risk did not"
    )
    for r in review:
        print(
            f"    - {r['pipeline']} / {r['context_label']}: "
            f"{r['from_impl_hash']} -> {r['to_impl_hash']} (risks {r['risk_ids']} unchanged)"
        )

    # render the fleet view as a self-contained HTML dashboard (for the docs page)
    report = os.path.join(out_dir, "fleet_recording_dashboard.html")
    cf.fleet.to_html(tables, report, title="Fleet dashboard - run recording")
    print(f"wrote {report}")
    return store_dir


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.