Fleet: recording runs
Record runs to a central store with cf.record_run, then roll them up. Two runs where the implementation changes but the risk does not – the fleet dashboard flags it as a review needed.
Example code
Source: examples/example_fleet_recording.py
"""Fleet: record runs to a central store and surface implementation-vs-risk drift.
Recording is **explicit** -- call ``cf.record_run(pipeline=...)`` to append an immutable
run record to a configured store. ``pipeline`` is the identity across runs; ``env`` /
``git_sha`` / ``tags`` are attributes. This example uses the portable JSON store so it runs
anywhere; in a Spark-only environment (no blob access) configure the Spark-table writer
instead -- nothing else changes::
cf.configure_store(writer="spark_table", spark=spark,
catalog="gov", schema="conformare")
It records two runs of the same pipeline+context where the **implementation changes but the
risk assessment does not** -- exactly the stale-governance smell the review report flags.
Run: python examples/example_fleet_recording.py
"""
import os
import shutil
import pandas as pd
import conformare as cf
PIPELINE = "customer_scoring"
def _run_pipeline(op: str):
"""One run of a tiny pipeline with a described, risk-bearing context."""
cf.restore()
cf.store.clear()
cf.reset_context()
cf.trackPandas()
cf.set_profiles({})
df = pd.DataFrame(
{"region": ["UK", "US", "UK"], "spend": [120.0, 80.0, 200.0], "tenure": [3, 2, 5]}
)
with (
cf.describe("Engineer features", purpose="Per-customer derived columns"),
cf.risk("pii-in-features", severity="high", mitigation="reviewed by DPO", owner="Alice"),
):
if op == "div":
df.assign(value=df["spend"] / df["tenure"])
else: # implementation changed -- but the risk above is unchanged
df.assign(value=df["spend"] / df["tenure"] * 1.2)
def main(out_dir=None):
out_dir = out_dir or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output"
)
store_dir = os.path.join(out_dir, "fleet_store")
if os.path.exists(store_dir):
shutil.rmtree(store_dir)
dest = {"writer": "json", "path": store_dir}
# Two runs over time: same pipeline + context, implementation changes, risk does not.
_run_pipeline("div")
cf.record_run(
PIPELINE,
dest=dest,
env="prod",
git_sha="aaaa111",
run_id="run-1",
ts="2026-06-01T09:00:00Z",
)
_run_pipeline("mul")
cf.record_run(
PIPELINE,
dest=dest,
env="prod",
git_sha="bbbb222",
run_id="run-2",
ts="2026-06-08T09:00:00Z",
)
cf.restore()
# Roll the fleet store up and answer the governance questions.
tables = cf.fleet.read_store(dest)
review = cf.fleet.review_report(tables)
healthy = cf.fleet.last_healthy_run(tables, PIPELINE)
owners = cf.fleet.owner_pipelines(tables)
print(f"recorded {len(tables['runs'])} runs to {store_dir}")
print(f" statuses : {[(r['run_id'], r['status']) for r in tables['runs']]}")
print(f" last healthy : {healthy['run_id'] if healthy else 'none'}")
print(f" owners : {[(o['owner'], o['pipelines']) for o in owners]}")
print(
f" REVIEW NEEDED: {len(review)} context(s) where implementation changed but risk did not"
)
for r in review:
print(
f" - {r['pipeline']} / {r['context_label']}: "
f"{r['from_impl_hash']} -> {r['to_impl_hash']} (risks {r['risk_ids']} unchanged)"
)
# render the fleet view as a self-contained HTML dashboard (for the docs page)
report = os.path.join(out_dir, "fleet_recording_dashboard.html")
cf.fleet.to_html(tables, report, title="Fleet dashboard - run recording")
print(f"wrote {report}")
return store_dir
if __name__ == "__main__":
main()