Connections (writers & stores)
A writer sends a RunRecord to a destination. Set a default once with configure_store(...), or pass dest=... per call to record_run. With neither, recording raises.
cf.configure_store(writer="spark_table", spark=spark, catalog="gov", schema="conformare")
# ... or per call:
cf.record_run("customer_scoring", dest={"writer": "json", "path": "/mnt/gov/runs"})
Available writers
spark_table (default) — no blob access needed
Writes each grain to a managed table through the Spark table interface (createDataFrame(...).write.saveAsTable(...)), for environments where developers can only write tables, not blobs. Tables are {catalog}.{schema}.{prefix}{grain} (catalog/schema optional) with explicit, type-stable schemas, created on first write and appended thereafter.
cf.configure_store(
writer="spark_table", spark=spark,
catalog="gov", schema="conformare", table_prefix="conformare_",
format="delta", # or "parquet" where Delta isn't available
)
Produces gov.conformare.conformare_runs, …_contexts, …_risks, …_sources, …_sinks, …_expectations, …_owners.
json / parquet — portable file stores
For blob-capable environments. One immutable file per run (no transaction layer needed):
cf.configure_store(writer="json", path="/mnt/gov/runs") # {path}/{pipeline}/{run_id}.json
cf.configure_store(writer="parquet", path="/mnt/gov/runs") # {path}/{grain}/pipeline=…/{run_id}.parquet
parquet is typed and columnar — register an external table / view over the folder, or query it directly with DuckDB, Athena, Spark or Trino.
callable — bring your own sink
Pass any function to forward the record (a warehouse loader, an OpenLineage emitter, a message queue):
def to_snowflake(record): ...
cf.configure_store(writer=to_snowflake)
Reading runs back
cf.fleet.read_store(config) reads grain tables back from the same config you wrote with (json, parquet, or spark_table). cf.fleet.merge_records([...]) does the same for records you already hold in memory. Both return a {grain: [rows]} dict ready for the governance queries.
tables = cf.fleet.read_store({"writer": "spark_table", "spark": spark,
"catalog": "gov", "schema": "conformare"})
Recommended layout on Databricks
A medallion split works well: keep an immutable json copy per run as the audit log (bronze), and spark_table (Delta, Unity Catalog) as the governed serving layer (silver) that your dashboards query (gold). Because runs are immutable and append-only, both are just accumulating logs — no merge logic.