Streaming analytics (PySpark)

The same pipeline on PySpark, tracked in place with zero code change.

Open the full report ↗

Example code

Source: examples/example_streaming_spark.py

"""PySpark twin of example_streaming.py.

Same synthetic data and the same pipeline, written against PySpark instead of
Narwhals, to check that conformare produces comparable lineage / column-lineage /
governance across backends.

Two deliberate differences from the Narwhals version:
  * profiling is disabled globally (``set_profiles({})``), and
  * each ``describe()`` context force-profiles **only its final node**
    (``force_profile(..., only="last", cache=True)``), so on Spark we pay for one
    cached action per context rather than profiling every step.

Run:  python examples/example_streaming_spark.py
Then open streaming_report_spark.html in a browser.
"""

import os
import tempfile

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import conformare as cf
import example_streaming as base  # reuse the synthetic data generator + constants

UK_REGIONS = base.UK_REGIONS


# --- nested tracked functions (mirror the Narwhals version) ------------------
def keep_uk(df):
    return df.filter(F.col("region").isin(UK_REGIONS))


def clean_customers(df):
    """Drop under-24s, then keep UK regions -> path clean_customers().keep_uk()."""
    adults = df.filter(F.col("age") >= 24)
    return keep_uk(adults)


def main(out=None):
    out = out or os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
        "output",
        "streaming_report_spark.html",
    )
    os.makedirs(os.path.dirname(out), exist_ok=True)
    tmp = tempfile.mkdtemp(prefix="conformare_stream_spark_")
    paths = base.generate(tmp)

    spark = (
        SparkSession.builder.master("local[1]")
        .appName("conformare-streaming-spark")
        .config("spark.ui.enabled", "false")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("ERROR")

    cf.trackSpark()
    cf.track_functions(True)
    cf.set_profiles({})  # profiling OFF by default
    cf.mark_sensitive("region", tag="location", category="Location", severity="low")

    cf.describe_process(
        "Nightly video-streaming analytics: clean UK customers, then build watch-time, "
        "ratings, payment and per-customer export reports from the streams/ratings/"
        "payments tables.",
        risks=cf.risk(
            "compliance.gdpr",
            note="processes UK resident PII end-to-end",
            mitigation="Pipeline covered by the annual DPIA; outputs reviewed before sharing",
            owner="data-governance",
        ),
    )

    def rd(p):
        return spark.read.option("header", True).option("inferSchema", True).csv(p)

    customers = rd(paths["customers"])
    streams = rd(paths["streams"])
    ratings = rd(paths["ratings"])
    payments = rd(paths["payments"])

    # profilers applied only to the last node of each context
    PROF = [
        cf.rowCount,
        cf.dataSize,
        cf.histogram(columns="all"),
        cf.nullFraction(columns="all"),
        cf.iqrOutliers(columns="all"),
    ]

    with (
        cf.describe(
            "Clean customers",
            purpose="Adults (24+) resident in the UK only",
            definition_owner="data-governance",
            risks=cf.risk(
                "privacy.pii_exposure",
                "compliance.gdpr",
                note="name + email present in the customer table",
                mitigation="Drop email before publishing reports",
                owner="data-governance",
            ),
        ),
        cf.force_profile(*PROF, only="last", cache=True),
    ):
        uk_adults = clean_customers(customers)

    with (
        cf.describe(
            "Watch time by region",
            purpose="Average watch minutes per UK region",
            definition_owner="analytics-product",
        ),
        cf.force_profile(*PROF, only="last", cache=True),
    ):
        cust_streams = uk_adults.join(streams, on="customer_id", how="inner")
        watch_by_region = cust_streams.groupBy("region").agg(
            F.mean(F.col("watch_minutes")).alias("avg_watch_minutes")
        )

    with (
        cf.describe(
            "Ratings by movie",
            purpose="Average viewer rating per title",
            definition_owner="content-team",
        ),
        cf.force_profile(*PROF, only="last", cache=True),
    ):
        rating_by_movie = ratings.groupBy("movie_id").agg(
            F.mean(F.col("rating")).alias("avg_rating")
        )

    with (
        cf.describe(
            "Payment by type",
            purpose="Average spend per payment method",
            definition_owner="finance",
        ),
        cf.force_profile(*PROF, only="last", cache=True),
    ):
        pay_by_type = payments.groupBy("payment_method").agg(
            F.mean(F.col("cost")).alias("avg_cost")
        )

    with (
        cf.describe(
            "Payment by completion",
            purpose="Average spend grouped by how much of a title is watched",
            definition_owner="finance",
            risks=cf.risk("quality.outliers", note="very short sessions skew completion bands"),
        ),
        cf.force_profile(*PROF, only="last", cache=True),
    ):
        pay_streams = payments.join(streams, on="customer_id", how="inner")
        pay_streams = pay_streams.withColumn(
            "completion_band", (F.floor(F.col("pct_complete") / 25) * 25)
        )
        pay_by_completion = pay_streams.groupBy("completion_band").agg(
            F.mean(F.col("cost")).alias("avg_cost")
        )

    with (
        cf.describe(
            "Customer watch-time export",
            purpose="Per-customer total watch time for downstream use",
            definition_owner="analytics-product",
            risks=cf.risk("privacy.pii_exposure", mitigation="Hash customer names on export"),
        ),
        cf.force_profile(*PROF, only="last", cache=True),
    ):
        per_customer = (
            uk_adults.join(streams, on="customer_id", how="inner")
            .groupBy("customer_id")
            .agg(F.sum(F.col("watch_minutes")).alias("total_watch_minutes"))
        )
        export = uk_adults.join(per_customer, on="customer_id", how="inner").select(
            "customer_id", "full_name", "region", "age", "total_watch_minutes"
        )

    export.write.mode("overwrite").csv(os.path.join(tmp, "customer_watch_time_spark"))

    cf.track_functions(False)
    html = cf.to_html(path=out, title="Video streaming analytics (PySpark) — conformare report")
    m = cf.build_model(cf.store)
    print(f"wrote {out} ({len(html):,} bytes)")
    print(
        f"  nodes={m['stats']['nodes']}, events={len(cf.lineage())}, "
        f"columns={m['stats']['columns']}, risks={m['stats']['risks']}, "
        f"created_cols={len(m['columnLineage']['catalog'])}"
    )
    profiled = sum(1 for n in m["nodes"] if n["rows"] is not None)
    print(f"  profiled nodes (force_profile only=last): {profiled}")
    cf.restore()  # unpatch + release cached frames
    spark.stop()
    _ = (watch_by_region, rating_by_movie, pay_by_type, pay_by_completion, export)
    return out


if __name__ == "__main__":
    main()

Output report

Open in a new tab ↗


This site uses Just the Docs, a documentation theme for Jekyll.