Streaming analytics (Narwhals)

A fuller end-to-end pipeline, profiled at every step.

Open the full report ↗

Example code

Source: examples/example_streaming.py

"""Extended example: a video-streaming analytics pipeline.

Generates synthetic customers / streams / ratings / payments with the stdlib
``random`` module, writes them to CSVs (so the reads are captured as sources),
runs a multi-report Narwhals pipeline tracked by conformare -- with described
contexts, risks + mitigations, nested tracked functions, and a final write -- and
emits an interactive HTML report.

Run:  python examples/example_streaming.py
Then open streaming_report.html in a browser.
"""

import os
import random
import tempfile

import narwhals as nw
import pandas as pd

import conformare as cf

UK_REGIONS = ["England", "Scotland", "Wales", "Northern Ireland"]
NON_UK = ["Ireland", "France", "Germany", "Spain", "United States"]
PAYMENT_METHODS = ["credit", "debit", "GooglePay", "Paypal"]
TIERS = [("Basic", 4), ("Standard", 7), ("Premium", 14)]
MOVIES = [
    (1, "The Quiet Sea", 128),
    (2, "Neon Highway", 95),
    (3, "Last Orbit", 142),
    (4, "Paper Lanterns", 88),
    (5, "The Cartographer", 117),
    (6, "Static Bloom", 101),
    (7, "Granite Hearts", 134),
    (8, "Midnight Allotment", 76),
    (9, "Velvet Circuit", 109),
    (10, "The Long Commute", 92),
]
FIRST = [
    "Ana",
    "Ben",
    "Cara",
    "Dan",
    "Eve",
    "Finn",
    "Gwen",
    "Hari",
    "Isla",
    "Jon",
    "Kira",
    "Liam",
    "Maya",
    "Noah",
    "Orla",
    "Priya",
    "Quinn",
    "Ravi",
    "Sian",
    "Tom",
]
LAST = [
    "Adams",
    "Bevan",
    "Clarke",
    "Davies",
    "Evans",
    "Ford",
    "Grant",
    "Hughes",
    "Ito",
    "Jones",
    "Khan",
    "Lewis",
    "Morgan",
    "Nolan",
    "Owen",
    "Patel",
]


def generate(tmp, seed=42):
    """Generate the four raw tables and write them to CSV; return their paths."""
    random.seed(seed)

    customers = []
    region_pool = UK_REGIONS * 5 + NON_UK  # ~80% UK
    for cid in range(1, 1001):
        fn, ln = random.choice(FIRST), random.choice(LAST)
        customers.append(
            {
                "customer_id": cid,
                "full_name": f"{fn} {ln}",
                "email": f"{fn.lower()}.{ln.lower()}{cid}@example.com",
                "age": random.randint(13, 80),
                "region": random.choice(region_pool),
            }
        )

    streams, sid = [], 1
    for _ in range(3000):
        cid = random.randint(1, 1000)
        mid, title, length = random.choice(MOVIES)
        pct = random.randint(1, 100)
        streams.append(
            {
                "stream_id": sid,
                "customer_id": cid,
                "movie_id": mid,
                "movie_title": title,
                "watch_minutes": round(length * pct / 100),
                "pct_complete": pct,
            }
        )
        sid += 1

    ratings = [
        {
            "customer_id": random.randint(1, 1000),
            "movie_id": random.choice(MOVIES)[0],
            "rating": random.randint(1, 5),
        }
        for _ in range(1500)
    ]

    payments = []
    for cid in range(1, 1001):
        tier, cost = random.choice(TIERS)
        payments.append(
            {
                "customer_id": cid,
                "payment_method": random.choice(PAYMENT_METHODS),
                "tier": tier,
                "cost": cost,
            }
        )

    paths = {}
    for name, rows in [
        ("customers", customers),
        ("streams", streams),
        ("ratings", ratings),
        ("payments", payments),
    ]:
        p = os.path.join(tmp, f"{name}.csv")
        pd.DataFrame(rows).to_csv(p, index=False)
        paths[name] = p
    return paths


# --- nested tracked functions (show up as call paths in the report) ----------
def keep_uk(df):
    return df.filter(nw.col("region").is_in(UK_REGIONS))


def clean_customers(df):
    """Drop under-24s, then keep UK regions -> path clean_customers().keep_uk()."""
    adults = df.filter(nw.col("age") >= 24)
    return keep_uk(adults)


def main(out=None):
    out = out or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
        "output",
        "streaming_report.html",
    )
    os.makedirs(os.path.dirname(out), exist_ok=True)
    tmp = tempfile.mkdtemp(prefix="conformare_stream_")
    paths = generate(tmp)

    cf.trackNarwhals()
    cf.track_functions(True)
    cf.set_profiles(
        {
            "*": [
                cf.rowCount,
                cf.dataSize,
                cf.histogram(columns="all"),
                cf.nullFraction(columns="all"),
                cf.iqrOutliers(columns="all"),
            ],
        }
    )
    cf.profile_sources(True)  # profile the raw read frames too
    cf.mark_sensitive("region", tag="location", category="Location", severity="low")

    cf.describe_process(
        "Nightly video-streaming analytics: clean UK customers, then build watch-time, "
        "ratings, payment and per-customer export reports from the streams/ratings/"
        "payments tables.",
        risks=cf.risk(
            "compliance.gdpr",
            note="processes UK resident PII end-to-end",
            mitigation="Pipeline covered by the annual DPIA; outputs reviewed before sharing",
            owner="data-governance",
        ),
    )

    customers = nw.from_native(pd.read_csv(paths["customers"]))
    streams = nw.from_native(pd.read_csv(paths["streams"]))
    ratings = nw.from_native(pd.read_csv(paths["ratings"]))
    payments = nw.from_native(pd.read_csv(paths["payments"]))

    with cf.describe(
        "Clean customers",
        purpose="Adults (24+) resident in the UK only",
        definition_owner="data-governance",
        risks=cf.risk(
            "privacy.pii_exposure",
            "compliance.gdpr",
            note="name + email present in the customer table",
            mitigation="Drop email before publishing reports",
            owner="data-governance",
        ),
    ):
        uk_adults = clean_customers(customers)

    with cf.describe(
        "Watch time by region",
        purpose="Average watch minutes per UK region",
        definition_owner="analytics-product",
        details="""
## Watch time by region

Joins the cleaned UK-adult customers to the `streams` table and reports the
**mean `watch_minutes`** per billing region.

### Inputs
- `uk_adults` — adults (24+) resident in the UK
- `streams` — one row per viewing session

### Assumptions & caveats
1. `region` is the customer's **billing** region, not their viewing location.
2. Sessions under a minute are kept — they are valid "channel surfing" events.
3. Regions with very few customers are still reported (no minimum-cohort filter).

> Downstream dashboards should treat single-customer regions as low-confidence.
""",
    ):
        cust_streams = uk_adults.join(streams, on="customer_id", how="inner")
        watch_by_region = cust_streams.group_by("region").agg(
            nw.col("watch_minutes").mean().alias("avg_watch_minutes")
        )

    with cf.describe(
        "Ratings by movie",
        purpose="Average viewer rating per title",
        definition_owner="content-team",
    ):
        rating_by_movie = ratings.group_by("movie_id").agg(
            nw.col("rating").mean().alias("avg_rating")
        )

    with cf.describe(
        "Payment by type", purpose="Average spend per payment method", definition_owner="finance"
    ):
        pay_by_type = payments.group_by("payment_method").agg(
            nw.col("cost").mean().alias("avg_cost")
        )

    with cf.describe(
        "Payment by completion",
        purpose="Average spend grouped by how much of a title is watched",
        definition_owner="finance",
        risks=cf.risk("quality.outliers", note="very short sessions skew completion bands"),
    ):
        pay_streams = payments.join(streams, on="customer_id", how="inner")
        pay_streams = pay_streams.with_columns(completion_band=(nw.col("pct_complete") // 25) * 25)
        pay_by_completion = pay_streams.group_by("completion_band").agg(
            nw.col("cost").mean().alias("avg_cost")
        )

    with cf.describe(
        "Customer watch-time export",
        purpose="Per-customer total watch time for downstream use",
        definition_owner="analytics-product",
        risks=cf.risk("privacy.pii_exposure", mitigation="Hash customer names on export"),
    ):  # unowned
        per_customer = (
            uk_adults.join(streams, on="customer_id", how="inner")
            .group_by("customer_id")
            .agg(nw.col("watch_minutes").sum().alias("total_watch_minutes"))
        )
        export = uk_adults.join(per_customer, on="customer_id", how="inner").select(
            "customer_id", "full_name", "region", "age", "total_watch_minutes"
        )

    export.write_csv(os.path.join(tmp, "customer_watch_time.csv"))

    cf.track_functions(False)
    html = cf.to_html(path=out, title="Video streaming analytics — conformare report")
    m = cf.build_model(cf.store)
    print(f"wrote {out} ({len(html):,} bytes)")
    print(
        f"  nodes={m['stats']['nodes']}, events={len(cf.lineage())}, "
        f"columns={m['stats']['columns']}, risks={m['stats']['risks']}"
    )
    # silence "unused" linting for the report frames built purely for lineage
    _ = (watch_by_region, rating_by_movie, pay_by_type, pay_by_completion, export)
    return out


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.