Governance from table comments (experimental)
A team that doesn’t use this package describes a table in its Spark comment using an agreed format; conformare parses purpose/owner/context/risks from it, ingests them into the fleet, and warns when a tracked pipeline sources the table.
Example code
Source: examples/example_comment_governance.py
"""Experimental: governance from a Spark table comment.
A Data Engineering team (who don't use this package) own a `sales_channel` reference table.
They agree a small standard and put purpose / owner / context / risks into the **table
comment**. conformare reads and parses that comment, so a developer sourcing the table is made
aware of the risks -- and, once ingested into the fleet, they surface automatically when a
tracked pipeline reads the table.
Run: python examples/example_comment_governance.py
Then open output/comment_governance_dashboard.html.
"""
import os
import shutil
from pyspark.sql import SparkSession
import conformare as cf
# The standard the business agreed, embedded in the table's own comment.
TABLE_COMMENT = (
"Canonical channel reference table.\n"
"@conformare\n"
"purpose: Map channel_id to a channel name\n"
"owner: Data Engineering\n"
"context: Channel codes were remapped in 2026; pre-2026 joins are wrong\n"
"risk: definition.channel_remap | high | codes remapped in 2026 | owner=Data Engineering\n"
"risk: privacy.partner_confidential | medium | partner is confidential | column=channel"
)
def main(out_dir=None):
out_dir = out_dir or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output"
)
os.makedirs(out_dir, exist_ok=True)
store_dir = os.path.join(out_dir, "comment_gov_store")
if os.path.exists(store_dir):
shutil.rmtree(store_dir)
spark = (
SparkSession.builder.master("local[1]")
.appName("conformare-comment-gov")
.config("spark.ui.enabled", "false")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
cf.register_risk(
"definition.channel_remap",
category="Definition",
label="Channel codes remapped in 2026",
default_severity="high",
)
cf.register_risk(
"privacy.partner_confidential",
category="Privacy",
label="Partner channel is confidential",
default_severity="medium",
)
# 1) Data Engineering create the table and describe it via the comment (no conformare).
spark.sql("DROP TABLE IF EXISTS sales_channel")
spark.createDataFrame(
[(1, "Web"), (2, "Retail"), (3, "Partner")], ["channel_id", "channel"]
).write.saveAsTable("sales_channel")
spark.sql(f"COMMENT ON TABLE sales_channel IS '{TABLE_COMMENT}'")
dest = {"writer": "json", "path": store_dir}
try:
# 2) Read the governance straight from the comment.
doc = cf.read_source_governance("sales_channel", spark=spark)
print("parsed from the table comment:")
print(f" purpose : {doc['purpose']}")
print(f" owner : {doc['owner']}")
print(f" context : {doc['contexts']}")
for r in doc["risks"]:
col = f" [col {r['column']}]" if r.get("column") else ""
print(f" risk : {r['id']} ({r['severity']}){col}")
# 3) Ingest the comment's risks into the fleet (a one-off governance sync).
cf.record_source_risk_from_comment("sales_channel", spark=spark, dest=dest)
# 4) A tracked pipeline reads the table; warn_on_source flags the inherited risk.
cf.configure_store(writer="json", path=store_dir, warn_on_source=True)
import warnings
cf.trackSpark()
cf.set_profiles({"*": [cf.rowCount]})
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
with cf.describe("Use channel reference", purpose="Join channel labels"):
channels = spark.table("sales_channel") # sourcing the documented table
channels.count()
cf.restore()
warned = [w for w in caught if issubclass(w.category, cf.fleet.UpstreamRiskWarning)]
print(f"\nwarned on load: {len(warned)} table(s)")
for w in warned:
print(f" ! {w.message}")
# 5) Render the inherited risk into the fleet dashboard.
upstream = cf.check_upstream_risks(["sales_channel"], dest=dest, warn=False)
tables = cf.fleet.read_store(dest)
dash = os.path.join(out_dir, "comment_governance_dashboard.html")
cf.fleet.to_html(
tables, dash, title="Comment governance - inherited risk", upstream=upstream
)
print(f"wrote {dash}")
finally:
spark.sql("DROP TABLE IF EXISTS sales_channel")
spark.stop()
return out_dir
if __name__ == "__main__":
main()