Streaming analytics (PySpark)
The same pipeline on PySpark, tracked in place with zero code change.
Example code
Source: examples/example_streaming_spark.py
"""PySpark twin of example_streaming.py.
Same synthetic data and the same pipeline, written against PySpark instead of
Narwhals, to check that conformare produces comparable lineage / column-lineage /
governance across backends.
Two deliberate differences from the Narwhals version:
* profiling is disabled globally (``set_profiles({})``), and
* each ``describe()`` context force-profiles **only its final node**
(``force_profile(..., only="last", cache=True)``), so on Spark we pay for one
cached action per context rather than profiling every step.
Run: python examples/example_streaming_spark.py
Then open streaming_report_spark.html in a browser.
"""
import os
import tempfile
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import conformare as cf
import example_streaming as base # reuse the synthetic data generator + constants
UK_REGIONS = base.UK_REGIONS
# --- nested tracked functions (mirror the Narwhals version) ------------------
def keep_uk(df):
return df.filter(F.col("region").isin(UK_REGIONS))
def clean_customers(df):
"""Drop under-24s, then keep UK regions -> path clean_customers().keep_uk()."""
adults = df.filter(F.col("age") >= 24)
return keep_uk(adults)
def main(out=None):
out = out or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"output",
"streaming_report_spark.html",
)
os.makedirs(os.path.dirname(out), exist_ok=True)
tmp = tempfile.mkdtemp(prefix="conformare_stream_spark_")
paths = base.generate(tmp)
spark = (
SparkSession.builder.master("local[1]")
.appName("conformare-streaming-spark")
.config("spark.ui.enabled", "false")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
cf.trackSpark()
cf.track_functions(True)
cf.set_profiles({}) # profiling OFF by default
cf.mark_sensitive("region", tag="location", category="Location", severity="low")
cf.describe_process(
"Nightly video-streaming analytics: clean UK customers, then build watch-time, "
"ratings, payment and per-customer export reports from the streams/ratings/"
"payments tables.",
risks=cf.risk(
"compliance.gdpr",
note="processes UK resident PII end-to-end",
mitigation="Pipeline covered by the annual DPIA; outputs reviewed before sharing",
owner="data-governance",
),
)
def rd(p):
return spark.read.option("header", True).option("inferSchema", True).csv(p)
customers = rd(paths["customers"])
streams = rd(paths["streams"])
ratings = rd(paths["ratings"])
payments = rd(paths["payments"])
# profilers applied only to the last node of each context
PROF = [
cf.rowCount,
cf.dataSize,
cf.histogram(columns="all"),
cf.nullFraction(columns="all"),
cf.iqrOutliers(columns="all"),
]
with (
cf.describe(
"Clean customers",
purpose="Adults (24+) resident in the UK only",
definition_owner="data-governance",
risks=cf.risk(
"privacy.pii_exposure",
"compliance.gdpr",
note="name + email present in the customer table",
mitigation="Drop email before publishing reports",
owner="data-governance",
),
),
cf.force_profile(*PROF, only="last", cache=True),
):
uk_adults = clean_customers(customers)
with (
cf.describe(
"Watch time by region",
purpose="Average watch minutes per UK region",
definition_owner="analytics-product",
),
cf.force_profile(*PROF, only="last", cache=True),
):
cust_streams = uk_adults.join(streams, on="customer_id", how="inner")
watch_by_region = cust_streams.groupBy("region").agg(
F.mean(F.col("watch_minutes")).alias("avg_watch_minutes")
)
with (
cf.describe(
"Ratings by movie",
purpose="Average viewer rating per title",
definition_owner="content-team",
),
cf.force_profile(*PROF, only="last", cache=True),
):
rating_by_movie = ratings.groupBy("movie_id").agg(
F.mean(F.col("rating")).alias("avg_rating")
)
with (
cf.describe(
"Payment by type",
purpose="Average spend per payment method",
definition_owner="finance",
),
cf.force_profile(*PROF, only="last", cache=True),
):
pay_by_type = payments.groupBy("payment_method").agg(
F.mean(F.col("cost")).alias("avg_cost")
)
with (
cf.describe(
"Payment by completion",
purpose="Average spend grouped by how much of a title is watched",
definition_owner="finance",
risks=cf.risk("quality.outliers", note="very short sessions skew completion bands"),
),
cf.force_profile(*PROF, only="last", cache=True),
):
pay_streams = payments.join(streams, on="customer_id", how="inner")
pay_streams = pay_streams.withColumn(
"completion_band", (F.floor(F.col("pct_complete") / 25) * 25)
)
pay_by_completion = pay_streams.groupBy("completion_band").agg(
F.mean(F.col("cost")).alias("avg_cost")
)
with (
cf.describe(
"Customer watch-time export",
purpose="Per-customer total watch time for downstream use",
definition_owner="analytics-product",
risks=cf.risk("privacy.pii_exposure", mitigation="Hash customer names on export"),
),
cf.force_profile(*PROF, only="last", cache=True),
):
per_customer = (
uk_adults.join(streams, on="customer_id", how="inner")
.groupBy("customer_id")
.agg(F.sum(F.col("watch_minutes")).alias("total_watch_minutes"))
)
export = uk_adults.join(per_customer, on="customer_id", how="inner").select(
"customer_id", "full_name", "region", "age", "total_watch_minutes"
)
export.write.mode("overwrite").csv(os.path.join(tmp, "customer_watch_time_spark"))
cf.track_functions(False)
html = cf.to_html(path=out, title="Video streaming analytics (PySpark) — conformare report")
m = cf.build_model(cf.store)
print(f"wrote {out} ({len(html):,} bytes)")
print(
f" nodes={m['stats']['nodes']}, events={len(cf.lineage())}, "
f"columns={m['stats']['columns']}, risks={m['stats']['risks']}, "
f"created_cols={len(m['columnLineage']['catalog'])}"
)
profiled = sum(1 for n in m["nodes"] if n["rows"] is not None)
print(f" profiled nodes (force_profile only=last): {profiled}")
cf.restore() # unpatch + release cached frames
spark.stop()
_ = (watch_by_region, rating_by_movie, pay_by_type, pay_by_completion, export)
return out
if __name__ == "__main__":
main()