Cross-run governance
Once runs are landing in a store, conformare.fleet answers the questions a business asks across every pipeline. Each query takes the grain tables from fleet.read_store(config) or fleet.merge_records([...]).
tables = cf.fleet.read_store({"writer": "spark_table", "spark": spark,
"catalog": "gov", "schema": "conformare"})
Implementation-vs-risk review report
The headline check. For each context, it compares consecutive runs and flags where the implementation changed but the risk assessment did not — a sign the governance record may have drifted out of step with the code, and a human should review.
for r in cf.fleet.review_report(tables):
print(r["pipeline"], "/", r["context_label"],
":", r["from_impl_hash"], "->", r["to_impl_hash"],
"(risks", r["risk_ids"], "unchanged)")
It uses the per-context impl_hash / risk_hash (see Fleet recording): review_needed = impl_hash changed AND risk_hash unchanged. Because impl_hash ignores data and loop counts, this fires on real code changes, not on new data.
Upstream risk on the tables you source
When you load a table, you may be inheriting risk recorded elsewhere in the fleet. If that table was written as a sink by another run, the risks that fed it travel with it:
report = cf.check_upstream_risks(["lake.clv_output", "lake.sales_channel"])
It raises an UpstreamRiskWarning and returns, per location, the inherited risks classified by how they reach the sink:
- direct — a risk attributed to the sink itself (distance 0);
- indirect — a risk on an upstream step/context that fed the sink, with a process distance (how many steps away it was);
- process — a process-wide risk on the producing pipeline;
- static — a manually declared risk (below).
This is precomputed per sink at record time (the sink_risks grain), so the check is a fast lookup by table location.
Automatic on load. Configure the store with warn_on_source=True and Conformare warns automatically whenever a pipeline loads a table that carries inherited risk – no explicit call needed:
cf.configure_store(writer="spark_table", spark=spark, catalog="gov", schema="conformare",
warn_on_source=True)
cf.trackSpark()
df = spark.read.table("lake.clv_output") # -> UpstreamRiskWarning if it carries risk
The lookup is read once and cached, and each table warns at most once per session, so the overhead is negligible.
Static risks for externally-produced tables
Not every source is built with this package – a Data Engineering team may hand you a table with its own business rules and caveats. Declare those once, as a static statement, and the fleet treats them like any other upstream risk (labelled static, with the same owner / severity / note fields):
cf.record_source_risk(
"lake.sales_channel", "definition.business_rule",
column="channel", owner="Data Engineering", severity="high",
note="channel codes were remapped in 2026 -- see the DE wiki",
)
Static declarations are one-off and not actively updated; re-declaring the same set is idempotent on file stores. check_upstream_risks considers them alongside run-derived risk.
The other questions
| Question | Call |
|---|---|
| What is our last healthy run? | cf.fleet.last_healthy_run(tables, "customer_scoring") |
| Who owns what (succession / transfer planning)? | cf.fleet.owner_pipelines(tables) → owner → pipelines + scopes |
| Which sinks were active as of a date? | cf.fleet.sinks_as_of(tables, "customer_scoring", "2026-06-30T00:00:00Z") |
| Global risks across the fleet | tables["risks"] (filter / aggregate as you like) |
| Process health over time | tables["runs"] — trend status by ts |
“Active as of” needs no separate state: a run is timestamped, so the sinks active at a point in time are simply the sinks of the latest run at or before it.
Querying the tables directly
Everything above is also plain SQL over the Delta tables. For example, the last healthy run per pipeline:
SELECT pipeline, max(ts) AS last_healthy
FROM gov.conformare.conformare_runs
WHERE status = 'healthy'
GROUP BY pipeline
…or every pipeline an owner is responsible for (succession):
SELECT owner, collect_set(pipeline) AS pipelines
FROM gov.conformare.conformare_owners
GROUP BY owner
The Python helpers are convenience wrappers; the normalized tables are the contract.
Worked example
examples/example_fleet_streaming_clv.py records two PySpark pipelines for a movie-streaming customer-lifetime-value programme – a value_model pipeline (opaque watch forecast -> revenue -> opex -> NPV, carrying model risks) and an audience_reports pipeline (young/old segmentation -> two report sinks, carrying a “customer names written” privacy risk) – to a Spark-table store, then reads it back to list the risks from both pipelines in one place and flag the sinks that carry sensitive columns.