Governance from table comments (experimental)

A team that doesn’t use this package describes a table in its Spark comment using an agreed format; conformare parses purpose/owner/context/risks from it, ingests them into the fleet, and warns when a tracked pipeline sources the table.

Open the full report ↗

Example code

Source: examples/example_comment_governance.py

"""Experimental: governance from a Spark table comment.

A Data Engineering team (who don't use this package) own a `sales_channel` reference table.
They agree a small standard and put purpose / owner / context / risks into the **table
comment**. conformare reads and parses that comment, so a developer sourcing the table is made
aware of the risks -- and, once ingested into the fleet, they surface automatically when a
tracked pipeline reads the table.

Run:  python examples/example_comment_governance.py
Then open output/comment_governance_dashboard.html.
"""

import os
import shutil

from pyspark.sql import SparkSession

import conformare as cf

# The standard the business agreed, embedded in the table's own comment.
TABLE_COMMENT = (
    "Canonical channel reference table.\n"
    "@conformare\n"
    "purpose: Map channel_id to a channel name\n"
    "owner: Data Engineering\n"
    "context: Channel codes were remapped in 2026; pre-2026 joins are wrong\n"
    "risk: definition.channel_remap | high | codes remapped in 2026 | owner=Data Engineering\n"
    "risk: privacy.partner_confidential | medium | partner is confidential | column=channel"
)


def main(out_dir=None):
    out_dir = out_dir or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output"
    )
    os.makedirs(out_dir, exist_ok=True)
    store_dir = os.path.join(out_dir, "comment_gov_store")
    if os.path.exists(store_dir):
        shutil.rmtree(store_dir)

    spark = (
        SparkSession.builder.master("local[1]")
        .appName("conformare-comment-gov")
        .config("spark.ui.enabled", "false")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("ERROR")
    cf.register_risk(
        "definition.channel_remap",
        category="Definition",
        label="Channel codes remapped in 2026",
        default_severity="high",
    )
    cf.register_risk(
        "privacy.partner_confidential",
        category="Privacy",
        label="Partner channel is confidential",
        default_severity="medium",
    )

    # 1) Data Engineering create the table and describe it via the comment (no conformare).
    spark.sql("DROP TABLE IF EXISTS sales_channel")
    spark.createDataFrame(
        [(1, "Web"), (2, "Retail"), (3, "Partner")], ["channel_id", "channel"]
    ).write.saveAsTable("sales_channel")
    spark.sql(f"COMMENT ON TABLE sales_channel IS '{TABLE_COMMENT}'")

    dest = {"writer": "json", "path": store_dir}
    try:
        # 2) Read the governance straight from the comment.
        doc = cf.read_source_governance("sales_channel", spark=spark)
        print("parsed from the table comment:")
        print(f"  purpose : {doc['purpose']}")
        print(f"  owner   : {doc['owner']}")
        print(f"  context : {doc['contexts']}")
        for r in doc["risks"]:
            col = f" [col {r['column']}]" if r.get("column") else ""
            print(f"  risk    : {r['id']} ({r['severity']}){col}")

        # 3) Ingest the comment's risks into the fleet (a one-off governance sync).
        cf.record_source_risk_from_comment("sales_channel", spark=spark, dest=dest)

        # 4) A tracked pipeline reads the table; warn_on_source flags the inherited risk.
        cf.configure_store(writer="json", path=store_dir, warn_on_source=True)
        import warnings

        cf.trackSpark()
        cf.set_profiles({"*": [cf.rowCount]})
        with warnings.catch_warnings(record=True) as caught:
            warnings.simplefilter("always")
            with cf.describe("Use channel reference", purpose="Join channel labels"):
                channels = spark.table("sales_channel")  # sourcing the documented table
                channels.count()
        cf.restore()
        warned = [w for w in caught if issubclass(w.category, cf.fleet.UpstreamRiskWarning)]
        print(f"\nwarned on load: {len(warned)} table(s)")
        for w in warned:
            print(f"  ! {w.message}")

        # 5) Render the inherited risk into the fleet dashboard.
        upstream = cf.check_upstream_risks(["sales_channel"], dest=dest, warn=False)
        tables = cf.fleet.read_store(dest)
        dash = os.path.join(out_dir, "comment_governance_dashboard.html")
        cf.fleet.to_html(
            tables, dash, title="Comment governance - inherited risk", upstream=upstream
        )
        print(f"wrote {dash}")
    finally:
        spark.sql("DROP TABLE IF EXISTS sales_channel")
        spark.stop()
    return out_dir


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.