Connections (writers & stores)

A writer sends a RunRecord to a destination. Set a default once with configure_store(...), or pass dest=... per call to record_run. With neither, recording raises.

cf.configure_store(writer="spark_table", spark=spark, catalog="gov", schema="conformare")
# ... or per call:
cf.record_run("customer_scoring", dest={"writer": "json", "path": "/mnt/gov/runs"})

Available writers

spark_table (default) — no blob access needed

Writes each grain to a managed table through the Spark table interface (createDataFrame(...).write.saveAsTable(...)), for environments where developers can only write tables, not blobs. Tables are {catalog}.{schema}.{prefix}{grain} (catalog/schema optional) with explicit, type-stable schemas, created on first write and appended thereafter.

cf.configure_store(
    writer="spark_table", spark=spark,
    catalog="gov", schema="conformare", table_prefix="conformare_",
    format="delta",        # or "parquet" where Delta isn't available
)

Produces gov.conformare.conformare_runs, …_contexts, …_risks, …_sources, …_sinks, …_expectations, …_owners.

json / parquet — portable file stores

For blob-capable environments. One immutable file per run (no transaction layer needed):

cf.configure_store(writer="json", path="/mnt/gov/runs")      # {path}/{pipeline}/{run_id}.json
cf.configure_store(writer="parquet", path="/mnt/gov/runs")   # {path}/{grain}/pipeline=…/{run_id}.parquet

parquet is typed and columnar — register an external table / view over the folder, or query it directly with DuckDB, Athena, Spark or Trino.

callable — bring your own sink

Pass any function to forward the record (a warehouse loader, an OpenLineage emitter, a message queue):

def to_snowflake(record): ...
cf.configure_store(writer=to_snowflake)

Reading runs back

cf.fleet.read_store(config) reads grain tables back from the same config you wrote with (json, parquet, or spark_table). cf.fleet.merge_records([...]) does the same for records you already hold in memory. Both return a {grain: [rows]} dict ready for the governance queries.

tables = cf.fleet.read_store({"writer": "spark_table", "spark": spark,
                              "catalog": "gov", "schema": "conformare"})

A medallion split works well: keep an immutable json copy per run as the audit log (bronze), and spark_table (Delta, Unity Catalog) as the governed serving layer (silver) that your dashboards query (gold). Because runs are immutable and append-only, both are just accumulating logs — no merge logic.


This site uses Just the Docs, a documentation theme for Jekyll.