Fleet: risks across pipelines (streaming CLV)

Two PySpark pipelines for a streaming customer-lifetime-value programme (an opaque watch forecast + NPV model, and an audience-report pipeline with two sinks) recorded to one Spark-table store, with risks surfaced across both pipelines.

Open the full report ↗

Example code

Source: examples/example_fleet_streaming_clv.py

"""Fleet end to end: two PySpark pipelines for a movie-streaming customer-lifetime-value
programme, each recorded to a central Spark-table fleet store, then risks surfaced across
both pipelines from that store.

Scenario -- a streaming service whose opex scales with minutes streamed, mitigated by plan
(720p is cheaper to serve than 1080p than 4k):

* **Value pipeline** (`value_model`) -- forecast watch behaviour with an *opaque* model
  (random expected minutes for years 1-3), apply per-year revenue, apply per-year opex from
  the cost to serve, then discount to a net present value at 9%. One customer-level output.
  Risks: churn is not modelled (no mitigation, future work), and the forecast is only at
  national granularity (content-type features would sharpen it).
* **Report pipeline** (`audience_reports`) -- split the audience into young and old (a
  context defines the threshold) and write a value report for each (two sinks). Risk:
  customer names are written to the reports when they should not be.

Each pipeline runs, then records its outcome. Finally we read the fleet store back and show
the risks from **both** pipelines side by side -- the point of the central store.

Uses the Spark-table writer (no blob access needed). Run:
    python examples/example_fleet_streaming_clv.py
"""

import os
import shutil

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import conformare as cf

# Plan economics: monthly price, and cost-to-serve per streamed minute (the opex mitigant).
PLAN_PRICE = {"720p": 8.0, "1080p": 12.0, "4k": 18.0}
PLAN_COST_PER_MIN = {"720p": 0.0001, "1080p": 0.0003, "4k": 0.0008}
NATIONS = ["England", "Scotland", "Wales", "Northern Ireland"]
DISCOUNT = 0.09


def _register_risks():
    cf.register_risk(
        "model.churn_not_modelled",
        category="Model risk",
        label="Churn not modelled",
        description="Lifetime value assumes retention; churn is not considered.",
        default_severity="medium",
    )
    cf.register_risk(
        "model.national_granularity",
        category="Model risk",
        label="Forecast at national granularity",
        description="Watch forecast is per nation; content-type features would add precision.",
        default_severity="low",
    )
    cf.register_risk(
        "privacy.names_written",
        category="Privacy",
        label="Customer names written to output",
        description="Reports include customer names, which should not leave the pipeline.",
        default_severity="high",
    )
    cf.register_risk(
        "definition.sales_channel",
        category="Definition",
        label="Sales-channel codes were remapped",
        description="Channel codes were redefined in 2026; older joins may be wrong.",
        default_severity="high",
    )


def _customers(spark, n: int = 400):
    plans = list(PLAN_PRICE)
    rows = [
        (i, f"Customer {i}", NATIONS[i % 4], 18 + (i * 5) % 60, plans[i % 3], 200 + (i * 37) % 4000)
        for i in range(n)
    ]
    return spark.createDataFrame(
        rows, ["customer_id", "customer_name", "nation", "age", "plan", "minutes_month"]
    )


def _plan_map(d: dict):
    pairs = []
    for k, v in d.items():
        pairs += [F.lit(k), F.lit(float(v))]
    return F.create_map(*pairs)


@cf.opaque
def forecast_watch(df):
    """Opaque ML forecaster: expected minutes watched in years 1-3 (random surrogate).

    Opaque, so the report shows one boxed step (forecast in, watch_y1..3 out) rather than the
    model's internals -- exactly how you would wrap a real, externally trained model."""
    d = df
    for y in (1, 2, 3):
        d = d.withColumn(
            f"watch_y{y}",
            (F.col("minutes_month") * 12) * (0.9 ** (y - 1)) * (0.6 + F.rand(42 + y)),
        )
    return d


def value_pipeline(spark, fleet_dest, out_dir) -> str:
    cf.store.clear()
    cf.reset_context()
    cf.trackSpark()
    cf.set_profiles({"*": [cf.rowCount]})
    cf.describe_process("Customer lifetime value for a movie-streaming service (3-year NPV).")

    customers = _customers(spark)
    cost = _plan_map(PLAN_COST_PER_MIN)
    price = _plan_map(PLAN_PRICE)

    with (
        cf.describe("Forecast watch behaviour", purpose="Expected minutes watched, years 1-3"),
        cf.risk("model.churn_not_modelled", note="future development planned"),  # no mitigation
    ):
        watched = forecast_watch(customers)  # opaque model boundary

    with (
        cf.describe("Customer lifetime value", purpose="Revenue - opex, discounted at 9%"),
        cf.risk("model.national_granularity", mitigation="add content-type features to the model"),
    ):
        v = watched
        for y in (1, 2, 3):
            v = v.withColumn(f"rev_y{y}", price[F.col("plan")] * 12)
            v = v.withColumn(f"opex_y{y}", F.col(f"watch_y{y}") * cost[F.col("plan")])
        npv = sum(
            (F.col(f"rev_y{y}") - F.col(f"opex_y{y}")) / ((1 + DISCOUNT) ** y) for y in (1, 2, 3)
        )
        clv = v.withColumn("clv", npv).select("customer_id", "nation", "plan", "clv")

    clv.write.mode("overwrite").parquet(os.path.join(out_dir, "clv_output"))
    cf.restore()  # stop tracking before the fleet writer runs its own Spark ops
    return cf.record_run("value_model", dest=fleet_dest, env="prod", git_sha="value-v1.0.0")


def report_pipeline(spark, fleet_dest, out_dir) -> str:
    cf.store.clear()
    cf.reset_context()
    cf.trackSpark()
    cf.set_profiles({"*": [cf.rowCount]})
    cf.describe_process("Audience value reports for the streaming CLV programme.")

    customers = _customers(spark)

    with cf.describe("Audience segmentation", purpose="young = age < 30; old = age >= 30"):
        young = customers.filter(F.col("age") < 30)
        old = customers.filter(F.col("age") >= 30)

    with cf.risk("privacy.names_written", note="customer_name reaches both written reports"):
        # the reports carry customer_name -- the risk made concrete (and flagged as a
        # sensitive column reaching a sink)
        young_report = young.select("customer_id", "customer_name", "nation", "plan")
        old_report = old.select("customer_id", "customer_name", "nation", "plan")
        young_report.write.mode("overwrite").parquet(os.path.join(out_dir, "young_report"))
        old_report.write.mode("overwrite").parquet(os.path.join(out_dir, "old_report"))

    cf.restore()
    return cf.record_run("audience_reports", dest=fleet_dest, env="prod", git_sha="report-v1.0.0")


def main(out_dir=None):
    out_dir = out_dir or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "output", "streaming_clv"
    )
    if os.path.exists(out_dir):
        shutil.rmtree(out_dir)
    os.makedirs(out_dir, exist_ok=True)

    spark = (
        SparkSession.builder.master("local[1]")
        .appName("conformare-clv-fleet")
        .config("spark.ui.enabled", "false")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("ERROR")
    _register_risks()

    # Spark-table fleet store (no blob access). Managed parquet tables, prefixed + cleaned up.
    fleet_dest = {
        "writer": "spark_table",
        "spark": spark,
        "table_prefix": "clv_fleet_",
        "format": "parquet",
    }
    for g in cf.fleet.GRAINS:
        spark.sql(f"DROP TABLE IF EXISTS clv_fleet_{g}")

    try:
        value_pipeline(spark, fleet_dest, out_dir)
        report_pipeline(spark, fleet_dest, out_dir)

        # --- the point: surface risks across BOTH pipelines from the one fleet store ---
        tables = cf.fleet.read_store(fleet_dest)

        print("runs recorded to the fleet store:")
        for r in sorted(tables["runs"], key=lambda r: r["pipeline"]):
            print(
                f"  {r['pipeline']:18} status={r['status']:8} "
                f"risks={r['risks_total']} sensitive_written={r['sensitive_written']} "
                f"sinks={r['n_sinks']}"
            )

        print("\nrisks across the fleet (every pipeline, one place):")
        for r in sorted(tables["risks"], key=lambda r: (r["pipeline"], r["severity"])):
            mit = r["mitigations"][0] if r["mitigations"] else "(none)"
            print(
                f"  [{r['pipeline']:16}] {r['severity']:6} {r['label']:32} "
                f"mitigated={r['mitigated']!s:5} ({mit})"
            )

        offenders = [r for r in tables["sinks"] if r.get("n_sensitive", 0) > 0]
        print(
            f"\nsinks carrying sensitive columns: {[(s['pipeline'], s['node']) for s in offenders]}"
        )

        # A static risk for an externally-produced table -- Data Engineering (who don't use
        # this package) declare it once; the fleet then treats it like any upstream risk.
        cf.record_source_risk(
            "lake.sales_channel",
            "definition.sales_channel",
            column="channel",
            owner="Data Engineering",
            note="channel codes were remapped in 2026 -- see the DE wiki",
            dest=fleet_dest,
        )

        # Inherited risk: sourcing the value pipeline's CLV output pulls in its upstream
        # model risks; sourcing the external table pulls in the static risk.
        clv_loc = os.path.join(out_dir, "clv_output")
        upstream = cf.check_upstream_risks(
            [clv_loc, "lake.sales_channel"], dest=fleet_dest, warn=False
        )
        print("\nupstream-risk check on tables you might source:")
        for u in upstream:
            print(
                f"  {os.path.basename(u['location']):16} {u['n_risks']} risk(s): "
                f"{u['direct']} direct, {u['indirect']} indirect, {u['process']} process, "
                f"{u['static']} static (distances {u['distances']})"
            )
            for r in u["risks"]:
                d = "" if r["distance"] is None else f" d{r['distance']}"
                print(f"      - [{r['kind']}{d}] {r['label']} ({r['severity']})")

        # render the cross-fleet view as a self-contained HTML dashboard (for the docs page)
        report = os.path.join(os.path.dirname(out_dir), "streaming_clv_fleet_dashboard.html")
        cf.fleet.to_html(
            tables, report, title="Streaming CLV programme - fleet dashboard", upstream=upstream
        )
        print(f"wrote {report}")
    finally:
        for g in cf.fleet.GRAINS:
            spark.sql(f"DROP TABLE IF EXISTS clv_fleet_{g}")
        spark.stop()
    return out_dir


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.