Fleet: risks on untracked sources
Declare static risks for an externally-produced table (one not built with this package), then a tracked pipeline that reads it inherits them – warned on load and surfaced in the dashboard’s inherited-risk section.
Example code
Source: examples/example_fleet_source_risks.py
"""Fleet: declare risks for an UNTRACKED data source, then surface them when a tracked
pipeline sources that table.
A common situation: a Data Engineering team (who do not use this package) hand you a
reference table with important, undocumented business rules. You can capture that as a
one-off **static source risk**, and from then on any tracked pipeline that *reads* the
table inherits the risk -- which shows up in the report (and, with ``warn_on_source=True``,
as a warning the moment the table is loaded).
This example:
1. Writes an external ``sales_channel`` table (as if produced elsewhere) and declares two
static risks against it with ``cf.record_source_risk`` -- one table-level, one on a column.
2. Runs a tracked pandas pipeline that reads that table (plus transactions), joins,
aggregates revenue per channel, and writes an output.
3. Records the run, then asks the fleet for the upstream risk on the tables it sourced and
renders it into the fleet dashboard's "Inherited risk on sourced tables" section.
Run: python examples/example_fleet_source_risks.py
Then open output/channel_fleet_dashboard.html (and output/channel_revenue_report.html).
"""
import os
import shutil
import warnings
import pandas as pd
import conformare as cf
def main(out_dir=None):
out_dir = out_dir or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output"
)
os.makedirs(out_dir, exist_ok=True)
store_dir = os.path.join(out_dir, "channel_fleet_store")
if os.path.exists(store_dir):
shutil.rmtree(store_dir)
inputs = os.path.join(out_dir, "channel_inputs")
os.makedirs(inputs, exist_ok=True)
# An external reference table + a transactions table, produced WITHOUT this package.
sales_channel = os.path.join(inputs, "sales_channel.csv")
pd.DataFrame({"channel_id": [1, 2, 3], "channel": ["Web", "Retail", "Partner"]}).to_csv(
sales_channel, index=False
)
transactions = os.path.join(inputs, "transactions.csv")
pd.DataFrame(
{"channel_id": [1, 1, 2, 3, 2], "amount": [100.0, 50.0, 200.0, 75.0, 40.0]}
).to_csv(transactions, index=False)
# Configure the fleet store; warn_on_source makes sourcing a risky table warn on load.
cf.configure_store(writer="json", path=store_dir, warn_on_source=True)
# 1) Declare static risks for the untracked sales_channel table -- a one-off statement.
cf.register_risk(
"definition.channel_remap",
category="Definition",
label="Sales-channel codes remapped in 2026",
default_severity="high",
)
cf.register_risk(
"privacy.partner_confidential",
category="Privacy",
label="Partner channel is commercially confidential",
default_severity="medium",
)
cf.record_source_risk(
sales_channel,
"definition.channel_remap",
owner="Data Engineering",
note="channel codes were redefined in 2026; pre-2026 joins are wrong",
)
cf.record_source_risk(
sales_channel,
"privacy.partner_confidential",
column="channel",
owner="Commercial",
note="the 'Partner' value must not be shared externally",
)
# 2) A TRACKED pipeline reads the external tables (auto-warns on the risky one), joins,
# aggregates, and writes an output.
cf.trackPandas()
cf.set_profiles({"*": [cf.rowCount]})
cf.describe_process(
"Revenue by sales channel, built on an externally-supplied reference table."
)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
with cf.describe("Load & join", purpose="Attach channel labels to transactions"):
channels = pd.read_csv(sales_channel) # the risk-bearing external source
tx = pd.read_csv(transactions)
joined = tx.merge(channels, on="channel_id", how="left")
with cf.describe("Aggregate", purpose="Sum revenue per channel"):
revenue = joined.groupby("channel", as_index=False).agg(revenue=("amount", "sum"))
revenue.to_csv(os.path.join(out_dir, "revenue_by_channel.csv"), index=False)
on_load = [w for w in caught if issubclass(w.category, cf.fleet.UpstreamRiskWarning)]
# 3) Record the run, write the per-run report, then surface upstream risk on its sources.
cf.record_run("channel_revenue", env="prod")
per_run = os.path.join(out_dir, "channel_revenue_report.html")
cf.to_html(per_run, title="Revenue by channel")
cf.restore()
tables = cf.fleet.read_store({"writer": "json", "path": store_dir})
sourced = sorted({s["location"] for s in tables["sources"]})
upstream = cf.check_upstream_risks(
sourced, dest={"writer": "json", "path": store_dir}, warn=False
)
dashboard = os.path.join(out_dir, "channel_fleet_dashboard.html")
cf.fleet.to_html(
tables, dashboard, title="Channel revenue - fleet & upstream risk", upstream=upstream
)
print(f"warned on load: {len(on_load)} table(s)")
for w in on_load:
print(f" ! {w.message}")
print("\nupstream risk on the pipeline's sources:")
for u in upstream:
tag = "" if not u["n_risks"] else f" <-- {u['n_risks']} risk(s)"
print(f" {os.path.basename(u['location']):20}{tag}")
for r in u["risks"]:
col = f" [col {r['column']}]" if r.get("column") else ""
print(
f" - [{r['kind']}] {r['label']} ({r['severity']}, owner {', '.join(r['owners']) or '-'}){col}"
)
print(f"\nwrote {per_run}")
print(f"wrote {dashboard}")
return dashboard
if __name__ == "__main__":
main()