From 622a228fb789b7c93f8078694ec7e379b9f0a421 Mon Sep 17 00:00:00 2001 From: Valentin Heiserer Date: Thu, 11 Dec 2025 20:55:44 +0100 Subject: [PATCH] 12 --- Aufgabe 10/Aufgabe10.py | 1 - Aufgabe 11/Aufgabe11.py | 688 +++++++++++++++++++-------------------- Aufgabe 12/Aufgabe12.py | 276 ++++++++++++++++ Aufgabe 12/sparkstart.py | 22 ++ 4 files changed, 642 insertions(+), 345 deletions(-) create mode 100644 Aufgabe 12/Aufgabe12.py create mode 100644 Aufgabe 12/sparkstart.py diff --git a/Aufgabe 10/Aufgabe10.py b/Aufgabe 10/Aufgabe10.py index 0aa3b52..d7763f9 100644 --- a/Aufgabe 10/Aufgabe10.py +++ b/Aufgabe 10/Aufgabe10.py @@ -5,7 +5,6 @@ import matplotlib.pyplot as plt HDFSPATH = "hdfs://193.174.205.250:54310/" - def read_parquets(spark: SparkSession): stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" diff --git a/Aufgabe 11/Aufgabe11.py b/Aufgabe 11/Aufgabe11.py index 82631cf..58d3db9 100644 --- a/Aufgabe 11/Aufgabe11.py +++ b/Aufgabe 11/Aufgabe11.py @@ -1,366 +1,366 @@ +from __future__ import annotations + from sparkstart import scon, spark from pyspark.sql import SparkSession -import time + import matplotlib.pyplot as plt -import pandas as pd - -HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/" -HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/" - -def init_view_stations(spark): - """Lädt die Stationsdaten für Aufgabe 11""" - s_path = HDFSPATH_STATIONS + "german_stations.parquet" - d_path = HDFSPATH_STATIONS + "german_stations_data.parquet" - - spark.read.parquet(s_path).createOrReplaceTempView("german_stations") - spark.read.parquet(d_path).createOrReplaceTempView("german_stations_data") - -def init_view_stocks(spark): - """Lädt die Stocks-Daten für Aufgabe 12""" - # Hinweis: Pfade anpassen, falls sie im Cluster anders liegen - spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks") - spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio") - -# --------------------------------------------------------- -# AUFGABE 11 -# --------------------------------------------------------- - -def task_11a_rollup(spark: SparkSession, station_name="Kempten"): - print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---") - start = time.time() - - # 1. Station ID finden - sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE station_name LIKE '%{station_name}%'") - try: - sid = sid_df.collect()[0]['stationId'] - except IndexError: - print(f"Station {station_name} nicht gefunden.") - return - - # 2. Rollup Query vorbereiten - q_prep = f""" - SELECT - YEAR(date) as yr, - QUARTER(date) as qt, - MONTH(date) as mo, - DAY(date) as da, - TT_TU - FROM german_stations_data - WHERE stationId = {sid} AND TT_TU IS NOT NULL AND TT_TU > -50 - """ - spark.sql(q_prep).createOrReplaceTempView("data_prep") - - q_rollup = """ - SELECT - yr, qt, mo, da, - MIN(TT_TU) as min_temp, - MAX(TT_TU) as max_temp, - AVG(TT_TU) as avg_temp, - - -- Datums-Konstruktion für Plots - DATE(STRING(yr) || '-' || STRING(qt*3-2) || '-01') as qt_date, - MAKE_DATE(yr, mo, 1) as mo_date, - MAKE_DATE(yr, 1, 1) as yr_date, - MAKE_DATE(yr, mo, da) as da_date - - FROM data_prep - GROUP BY ROLLUP(yr, qt, mo, da) - """ - - df_rollup = spark.sql(q_rollup) - df_rollup.cache() - df_rollup.createOrReplaceTempView("station_rollup") - - # Trigger Action for Cache & Time Measurement - count = df_rollup.count() - print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start:.2f}s") - input(">> 11a: Check Spark UI (Stages/Storage) jetzt. Enter für Plots...") - - # Plot 1: Tageswerte (letzte 3 Jahre der Daten) - q_days = """ - SELECT da_date as date, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL - AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup) - ORDER BY date - """ - pdf_days = spark.sql(q_days).toPandas() - - plt.figure(1, figsize=(10, 5)) - plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5) - plt.title(f"{station_name}: Daily Average (Last 3 Years)") - plt.xlabel('Date') - plt.ylabel('Temp °C') - plt.tight_layout() - plt.show() - - # Plot 2: Monatswerte (10-20 Jahre) - q_months = """ - SELECT mo_date as date, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) - ORDER BY date - """ - pdf_months = spark.sql(q_months).toPandas() - - plt.figure(2, figsize=(10, 5)) - plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg') - plt.title(f"{station_name}: Monthly Average (Last 20 Years)") - plt.xlabel('Date') - plt.ylabel('Temp °C') - plt.tight_layout() - plt.show() - - # Plot 3: Quartalswerte - q_quarters = """ - SELECT qt_date as date, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) - ORDER BY date - """ - pdf_quarters = spark.sql(q_quarters).toPandas() - - plt.figure(3, figsize=(10, 5)) - plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg') - plt.title(f"{station_name}: Quarterly Average (Last 20 Years)") - plt.show() - - # Plot 4: Jahreswerte - q_years = """ - SELECT yr_date as date, min_temp, max_temp, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) - ORDER BY date - """ - pdf_years = spark.sql(q_years).toPandas() - - plt.figure(4, figsize=(10, 5)) - plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max') - plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg') - plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min') - plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)") - plt.legend() - plt.show() -def task_11b_rank(spark: SparkSession): - print("\n--- Aufgabe 11b: TempMonat Ranking ---") - start = time.time() - - q_tempmonat = """ - SELECT - d.stationId, - s.station_name, - SUBSTR(CAST(d.date AS STRING), 1, 4) as year, - SUBSTR(CAST(d.date AS STRING), 6, 2) as month, - MIN(d.TT_TU) as min_t, - MAX(d.TT_TU) as max_t, - AVG(d.TT_TU) as avg_t - FROM german_stations_data d - JOIN german_stations s ON d.stationId = s.stationId - WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50 - GROUP BY d.stationId, s.station_name, year, month - """ - df_tm = spark.sql(q_tempmonat) - df_tm.createOrReplaceTempView("tempmonat") - - # 1. Ranking Partitioniert nach Monat im Jahr 2015 - print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...") - q_rank_2015 = """ - SELECT - month, station_name, min_t, - RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min, - RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max, - RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg - FROM tempmonat - WHERE year = '2015' - ORDER BY rank_min, month - """ - spark.sql(q_rank_2015).show(10) - - # 2. Globales Ranking (über alle Monate/Jahre hinweg) - print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...") - q_rank_global = """ - SELECT - year, month, station_name, min_t, - RANK() OVER (ORDER BY min_t ASC) as rank_min, - RANK() OVER (ORDER BY max_t ASC) as rank_max, - RANK() OVER (ORDER BY avg_t ASC) as rank_avg - FROM tempmonat - ORDER BY rank_min - """ - spark.sql(q_rank_global).show(10) - - print(f"Dauer 11b: {time.time() - start:.2f}s") - input(">> 11b: Check Spark UI (Jobs/Stages). Enter...") +HDFSPATH = "hdfs://193.174.205.250:54310/" -def task_11c_groupingsets(spark: SparkSession): - print("\n--- Aufgabe 11c: Grouping Sets ---") - start = time.time() +def read_parquet_tables(spark: SparkSession) -> None: + """Load station master data and hourly measurements from parquet if needed.""" + stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" + products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" - q_prep = """ - SELECT - CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year, - CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month, - s.station_name, - s.bundesland, - d.TT_TU - FROM german_stations_data d - JOIN german_stations s ON d.stationId = s.stationId - WHERE d.TT_TU > -50 - """ - spark.sql(q_prep).createOrReplaceTempView("gs_base") + stations_df = spark.read.parquet(stations_path) + stations_df.createOrReplaceTempView("german_stations") + stations_df.cache() - q_sets = """ - SELECT - year, - month, - bundesland, - station_name, - MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t - FROM gs_base - GROUP BY GROUPING SETS ( - (year, bundesland), - (year, station_name), - (month, bundesland) - ) - """ - df_gs = spark.sql(q_sets) - df_gs.cache() - df_gs.createOrReplaceTempView("grouping_result") - - # Action zum Cachen - df_gs.count() - print(f"Grouping Sets berechnet. Dauer: {time.time() - start:.2f}s") - - print("Auswahl 1: Jahr & Bundesland") - spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5) - - print("Auswahl 2: Jahr & Station") - spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5) - - print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)") - spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5) - - input(">> 11c: Check Spark UI (Zugriffspläne/Storage). Enter...") + products_df = spark.read.parquet(products_path) + products_df.createOrReplaceTempView("german_stations_data") + products_df.cache() -# --------------------------------------------------------- -# AUFGABE 12 -# --------------------------------------------------------- +def _escape_like(value: str) -> str: + """Escape single quotes for safe SQL literal usage.""" + return value.replace("'", "''") -def task_12_stocks_analysis(spark: SparkSession): - print("\n--- Aufgabe 12: Stocks & Portfolio ---") - - # a) Erstes und letztes Datum je Symbol - print("a) Min/Max Datum pro Symbol") - t0 = time.time() - q_a = """ - SELECT symbol, MIN(date) as first_date, MAX(date) as last_date - FROM stocks - GROUP BY symbol - ORDER BY symbol - """ - spark.sql(q_a).show(5) - print(f"Zeit a): {time.time()-t0:.2f}s") - # b) Aggregationen 2009 - print("\nb) High/Low/Avg Close 2009") - t0 = time.time() - q_b = """ - SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close - FROM stocks - WHERE YEAR(date) = 2009 - GROUP BY symbol - ORDER BY symbol - """ - spark.sql(q_b).show(5) - print(f"Zeit b): {time.time()-t0:.2f}s") +def resolve_station_id(spark: SparkSession, station_identifier) -> int: + """Resolve station id either from int input or fuzzy name search.""" + if isinstance(station_identifier, int): + return station_identifier + if isinstance(station_identifier, str) and station_identifier.strip().isdigit(): + return int(station_identifier.strip()) + if isinstance(station_identifier, str): + needle = _escape_like(station_identifier.lower()) + q = ( + "SELECT stationId FROM german_stations " + f"WHERE lower(station_name) LIKE '%{needle}%' ORDER BY station_name LIMIT 1" + ) + result = spark.sql(q).collect() + if not result: + raise ValueError(f"No station found for pattern '{station_identifier}'") + return int(result[0]["stationId"]) + raise ValueError("station_identifier must be int or str") - # c) Lateral View (Explode Portfolio) - print("\nc) Lateral View: Aktien in Portfolios") - t0 = time.time() - - q_c = """ - SELECT - h.symbol, - SUM(h.amount) as total_shares, - COUNT(p.portfolioId) as num_portfolios, - AVG(h.amount) as avg_per_portfolio - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - GROUP BY h.symbol - ORDER BY h.symbol - """ - spark.sql(q_c).show(5) - print(f"Zeit c): {time.time()-t0:.2f}s") - # d) Symbole in keinem Portfolio (Anti Join) - print("\nd) Symbole ohne Portfolio") - t0 = time.time() - q_d = """ - SELECT DISTINCT s.symbol - FROM stocks s - LEFT ANTI JOIN ( - SELECT DISTINCT h.symbol - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - ) p_sym ON s.symbol = p_sym.symbol - ORDER BY s.symbol - """ - spark.sql(q_d).show(5) - print(f"Zeit d): {time.time()-t0:.2f}s") - - input(">> 12 a-d fertig. Check UI. Enter für e)...") +def build_station_rollup_for_station(spark: SparkSession, station_identifier) -> None: + """Create rollup view with min/max/avg per hour/day/month/quarter/year.""" + station_id = resolve_station_id(spark, station_identifier) + q = f""" + WITH base AS ( + SELECT + d.stationId, + gs.station_name, + TO_TIMESTAMP(CONCAT(d.date, LPAD(CAST(d.hour AS STRING), 2, '0')), 'yyyyMMddHH') AS hour_ts, + TO_DATE(d.date, 'yyyyMMdd') AS day_date, + MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_in_year, + QUARTER(TO_DATE(d.date, 'yyyyMMdd')) AS quarter_in_year, + YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value, + d.TT_TU AS temperature + FROM german_stations_data d + JOIN german_stations gs ON d.stationId = gs.stationId + WHERE d.stationId = {station_id} + AND d.TT_TU IS NOT NULL + AND d.TT_TU <> -999 + ), + rollup_base AS ( + SELECT + stationId, + station_name, + hour_ts, + day_date, + month_in_year, + quarter_in_year, + year_value, + MIN(temperature) AS min_temp, + MAX(temperature) AS max_temp, + AVG(temperature) AS avg_temp + FROM base + GROUP BY stationId, station_name, ROLLUP(year_value, quarter_in_year, month_in_year, day_date, hour_ts) + ) + SELECT + stationId, + station_name, + hour_ts, + day_date, + month_in_year, + quarter_in_year, + year_value, + CASE WHEN month_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(month_in_year AS STRING), 2, '0'), '-01')) END AS month_start_date, + CASE WHEN quarter_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(quarter_in_year * 3 - 2 AS STRING), 2, '0'), '-01')) END AS quarter_start_date, + CASE WHEN year_value IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-01-01')) END AS year_start_date, + min_temp, + max_temp, + avg_temp + FROM rollup_base + """ + rollup_df = spark.sql(q) + rollup_df.cache() + rollup_df.createOrReplaceTempView("station_rollup") - # e) Portfolio Wert Ende 2010 - print("\ne) Portfolio Bewertung Ende 2010") - t0 = time.time() - - q_last_price = """ - SELECT symbol, close - FROM ( - SELECT - symbol, - close, - ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn - FROM stocks - WHERE YEAR(date) = 2010 - ) tmp - WHERE rn = 1 - """ - spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end") - # Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren - q_val = """ - SELECT - p.portfolioId, - SUM(h.amount * s.close) as portfolio_value_2010 - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - JOIN stocks_2010_end s ON h.symbol = s.symbol - GROUP BY p.portfolioId - ORDER BY p.portfolioId - """ - spark.sql(q_val).show(5) - print(f"Zeit e): {time.time()-t0:.2f}s") +def _year_window(spark: SparkSession, years_back: int, station_id: int) -> tuple[int, int] | None: + stats = spark.sql( + f"SELECT MIN(year_value) AS min_year, MAX(year_value) AS max_year FROM station_rollup WHERE year_value IS NOT NULL AND stationId = {station_id}" + ).collect() + if not stats or stats[0]["max_year"] is None: + return None + min_year = int(stats[0]["min_year"]) + max_year = int(stats[0]["max_year"]) + start_year = max(min_year, max_year - years_back + 1) + return start_year, max_year + + +def plot_station_rollup_levels( + spark: SparkSession, + station_identifier, + day_span_years: int = 3, + agg_span_years: int = 15, +) -> None: + """Plot day, month, quarter, and year aggregates for the given station.""" + station_id = resolve_station_id(spark, station_identifier) + needs_refresh = not spark.catalog.tableExists("station_rollup") + if not needs_refresh: + count = spark.sql( + f"SELECT COUNT(*) AS cnt FROM station_rollup WHERE stationId = {station_id}" + ).collect()[0]["cnt"] + needs_refresh = count == 0 + if needs_refresh: + build_station_rollup_for_station(spark, station_id) + + day_window = _year_window(spark, day_span_years, station_id) + if day_window is None: + print("No data available for plotting") + return + month_window = _year_window(spark, agg_span_years, station_id) + if month_window is None: + print("No aggregated window available") + return + + def _plot(query: str, figure_idx: int, title: str, x_col: str = "bucket_date") -> None: + pdf = spark.sql(query).toPandas() + if pdf.empty: + print(f"No data for {title}") + return + plt.figure(num=figure_idx) + plt.clf() + metrics = [ + ("min_temp", "Min", "#1f77b4"), + ("avg_temp", "Avg", "#ff7f0e"), + ("max_temp", "Max", "#2ca02c"), + ] + for col, label, color in metrics: + if col in pdf: + plt.plot(pdf[x_col], pdf[col], label=label, color=color) + plt.title(title) + plt.xlabel("Datum") + plt.ylabel("Temperatur (°C)") + plt.legend() + plt.tight_layout() + plt.show() + + day_start, day_end = day_window + q_day = f""" + SELECT day_date AS bucket_date, min_temp, avg_temp, max_temp + FROM station_rollup + WHERE stationId = {station_id} + AND hour_ts IS NULL + AND day_date IS NOT NULL + AND year_value BETWEEN {day_start} AND {day_end} + ORDER BY bucket_date + """ + _plot(q_day, 1, f"Tagesmittelwerte {day_start}-{day_end}") + + agg_start, agg_end = month_window + q_month = f""" + SELECT month_start_date AS bucket_date, min_temp, avg_temp, max_temp + FROM station_rollup + WHERE stationId = {station_id} + AND day_date IS NULL + AND month_in_year IS NOT NULL + AND year_value BETWEEN {agg_start} AND {agg_end} + ORDER BY bucket_date + """ + _plot(q_month, 2, f"Monatsmittelwerte {agg_start}-{agg_end}") + + q_quarter = f""" + SELECT quarter_start_date AS bucket_date, min_temp, avg_temp, max_temp + FROM station_rollup + WHERE stationId = {station_id} + AND month_in_year IS NULL + AND quarter_in_year IS NOT NULL + AND year_value BETWEEN {agg_start} AND {agg_end} + ORDER BY bucket_date + """ + _plot(q_quarter, 3, f"Quartalsmittelwerte {agg_start}-{agg_end}") + + q_year = f""" + SELECT year_start_date AS bucket_date, min_temp, avg_temp, max_temp + FROM station_rollup + WHERE stationId = {station_id} + AND quarter_in_year IS NULL + AND year_value IS NOT NULL + ORDER BY bucket_date + """ + _plot(q_year, 4, "Jahresmittelwerte") + + +def create_tempmonat(spark: SparkSession) -> None: + """Create cached temp table tempmonat with monthly aggregates per station.""" + q = """ + SELECT + d.stationId, + gs.station_name, + YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value, + MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value, + MIN(d.TT_TU) AS min_temp, + MAX(d.TT_TU) AS max_temp, + AVG(d.TT_TU) AS avg_temp + FROM german_stations_data d + JOIN german_stations gs ON d.stationId = gs.stationId + WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 + GROUP BY d.stationId, gs.station_name, YEAR(TO_DATE(d.date, 'yyyyMMdd')), MONTH(TO_DATE(d.date, 'yyyyMMdd')) + """ + monthly_df = spark.sql(q) + monthly_df.cache() + monthly_df.createOrReplaceTempView("tempmonat") + + +def rank_coldest_per_month_2015(spark: SparkSession): + """Rank stations by coldest values per month for 2015 using tempmonat.""" + return spark.sql( + """ + SELECT + stationId, + station_name, + year_value, + month_value, + min_temp, + max_temp, + avg_temp, + RANK() OVER (PARTITION BY month_value ORDER BY min_temp ASC) AS rank_min, + RANK() OVER (PARTITION BY month_value ORDER BY max_temp ASC) AS rank_max, + RANK() OVER (PARTITION BY month_value ORDER BY avg_temp ASC) AS rank_avg + FROM tempmonat + WHERE year_value = 2015 + ORDER BY rank_min, month_value + """ + ) + + +def rank_coldest_overall(spark: SparkSession): + """Rank stations by coldest values over all months/years (no partition).""" + return spark.sql( + """ + SELECT + stationId, + station_name, + year_value, + month_value, + min_temp, + max_temp, + avg_temp, + RANK() OVER (ORDER BY min_temp ASC) AS rank_min, + RANK() OVER (ORDER BY max_temp ASC) AS rank_max, + RANK() OVER (ORDER BY avg_temp ASC) AS rank_avg + FROM tempmonat + ORDER BY rank_min + """ + ) + + +def create_grouping_sets_overview(spark: SparkSession) -> None: + """Compute grouping sets for requested aggregations and cache the result.""" + q = """ + WITH base AS ( + SELECT + YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value, + MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value, + gs.bundesland, + gs.stationId, + gs.station_name, + d.TT_TU AS temperature + FROM german_stations_data d + JOIN german_stations gs ON d.stationId = gs.stationId + WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 + ) + SELECT + year_value, + month_value, + bundesland, + stationId, + station_name, + MIN(temperature) AS min_temp, + MAX(temperature) AS max_temp, + AVG(temperature) AS avg_temp + FROM base + GROUP BY GROUPING SETS ( + (year_value, bundesland), + (year_value, stationId, station_name, bundesland), + (month_value, bundesland) + ) + """ + grouped_df = spark.sql(q) + grouped_df.cache() + grouped_df.createOrReplaceTempView("grouping_sets_stats") + + +def select_year_bundesland(spark: SparkSession): + return spark.sql( + """ + SELECT year_value, bundesland, min_temp, max_temp, avg_temp + FROM grouping_sets_stats + WHERE bundesland IS NOT NULL AND month_value IS NULL AND stationId IS NULL + ORDER BY year_value, bundesland + """ + ) + + +def select_year_station(spark: SparkSession): + return spark.sql( + """ + SELECT year_value, stationId, station_name, min_temp, max_temp, avg_temp + FROM grouping_sets_stats + WHERE stationId IS NOT NULL AND month_value IS NULL + ORDER BY year_value, stationId + """ + ) + + +def select_month_bundesland(spark: SparkSession): + return spark.sql( + """ + SELECT month_value, bundesland, min_temp, max_temp, avg_temp + FROM grouping_sets_stats + WHERE month_value IS NOT NULL AND year_value IS NULL + ORDER BY month_value, bundesland + """ + ) def main(scon, spark): - init_view_stations(spark) - - # Aufgabe 11 - task_11a_rollup(spark, station_name="Kempten") - task_11b_rank(spark) - task_11c_groupingsets(spark) + read_parquet_tables(spark) + build_station_rollup_for_station(spark, "kempten") + plot_station_rollup_levels(spark, "kempten") - # Aufgabe 12 - init_view_stocks(spark) - task_12_stocks_analysis(spark) + create_tempmonat(spark) + print("Rangfolgen 2015 je Monat:") + rank_coldest_per_month_2015(spark).show(36, truncate=False) + print("Rangfolgen gesamt:") + rank_coldest_overall(spark).show(36, truncate=False) -if __name__ == '__main__': - main(scon, spark) \ No newline at end of file + create_grouping_sets_overview(spark) + print("Jahr vs Bundesland:") + select_year_bundesland(spark).show(20, truncate=False) + print("Jahr vs Station:") + select_year_station(spark).show(20, truncate=False) + print("Monat vs Bundesland:") + select_month_bundesland(spark).show(20, truncate=False) + + +if __name__ == "__main__": + main(scon, spark) diff --git a/Aufgabe 12/Aufgabe12.py b/Aufgabe 12/Aufgabe12.py new file mode 100644 index 0000000..dd9531f --- /dev/null +++ b/Aufgabe 12/Aufgabe12.py @@ -0,0 +1,276 @@ +from __future__ import annotations + +from typing import Iterable, Sequence + +from pyspark.sql import SparkSession, functions as F, types as T + +from sparkstart import scon, spark + + +HDFSPATH = "hdfs://193.174.205.250:54310/" + + +_DATE_FALLBACK_EXPR = "COALESCE(date_value, TO_DATE(date_str), TO_DATE(date_str, 'yyyyMMdd'))" + + +def _resolve_column_name(columns: Sequence[str], candidates: Iterable[str]) -> str: + + lowered = {col.lower(): col for col in columns} + for candidate in candidates: + match = lowered.get(candidate.lower()) + if match: + return match + raise ValueError(f"None of the candidate columns {list(candidates)} exist in {columns}") + + +def _normalize_stocks_view(spark: SparkSession) -> None: + + stocks_path = HDFSPATH + "stocks/stocks.parquet" + stocks_df = spark.read.parquet(stocks_path) + + symbol_col = _resolve_column_name(stocks_df.columns, ("symbol", "ticker")) + date_col = _resolve_column_name(stocks_df.columns, ("date", "pricedate", "dt")) + close_col = _resolve_column_name(stocks_df.columns, ("close", "closeprice", "closingprice")) + + stocks_df = ( + stocks_df + .select( + F.col(symbol_col).alias("symbol"), + F.col(date_col).alias("raw_date"), + F.col(close_col).alias("close_raw"), + ) + .withColumn("date_str", F.col("raw_date").cast("string")) + ) + + date_candidates = [ + F.col("raw_date").cast("date"), + F.to_date("raw_date"), + F.to_date("date_str"), + F.to_date("date_str", "yyyyMMdd"), + F.to_date("date_str", "MM/dd/yyyy"), + ] + + stocks_df = ( + stocks_df + .withColumn("date_value", F.coalesce(*date_candidates)) + .withColumn("year_value", F.substring("date_str", 1, 4).cast("int")) + .withColumn("close_value", F.col("close_raw").cast("double")) + .select("symbol", "date_value", "date_str", "year_value", "close_value") + ) + + stocks_df.cache() + stocks_df.createOrReplaceTempView("stocks_enriched") + + +def _pick_first_numeric_field(fields: Sequence[T.StructField]) -> str: + + numeric_types = ( + T.ByteType, + T.ShortType, + T.IntegerType, + T.LongType, + T.FloatType, + T.DoubleType, + T.DecimalType, + ) + for field in fields: + if isinstance(field.dataType, numeric_types): + return field.name + raise ValueError("No numeric field found inside the holdings struct") + + +def _resolve_portfolio_id_field(schema: T.StructType) -> str: + + priority = ("portfolio_id", "portfolioid", "id") + lowered = {field.name.lower(): field.name for field in schema.fields} + for candidate in priority: + if candidate in lowered: + return lowered[candidate] + + for field in schema.fields: + if not isinstance(field.dataType, (T.ArrayType, T.MapType)): + return field.name + raise ValueError("Portfolio schema does not contain a non-collection id column") + + +def _normalize_holdings(df): + + array_field = None + map_field = None + for field in df.schema.fields: + if isinstance(field.dataType, T.ArrayType) and isinstance(field.dataType.elementType, T.StructType): + array_field = field + break + if isinstance(field.dataType, T.MapType) and isinstance(field.dataType.keyType, T.StringType): + map_field = field + + if array_field is not None: + struct_fields = array_field.dataType.elementType.fields + symbol_field = _resolve_column_name([f.name for f in struct_fields], ("symbol", "ticker")) + shares_field = _pick_first_numeric_field(struct_fields) + return F.expr( + f"transform(`{array_field.name}`, x -> named_struct('symbol', x.`{symbol_field}`, 'shares', CAST(x.`{shares_field}` AS DOUBLE)))" + ) + + if map_field is not None and isinstance(map_field.dataType.valueType, (T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType)): + return F.expr( + f"transform(map_entries(`{map_field.name}`), x -> named_struct('symbol', x.key, 'shares', CAST(x.value AS DOUBLE)))" + ) + + raise ValueError("Could not locate holdings column (array or map) in portfolio data") + + +def _normalize_portfolio_view(spark: SparkSession) -> None: + + portfolio_path = HDFSPATH + "stocks/portfolio.parquet" + portfolio_df = spark.read.parquet(portfolio_path) + + id_col = _resolve_portfolio_id_field(portfolio_df.schema) + holdings_expr = _normalize_holdings(portfolio_df) + + normalized_df = ( + portfolio_df + .select( + F.col(id_col).alias("portfolio_id"), + holdings_expr.alias("holdings"), + ) + ) + + normalized_df.cache() + normalized_df.createOrReplaceTempView("portfolio") + + spark.sql( + """ + CREATE OR REPLACE TEMP VIEW portfolio_positions AS + SELECT + portfolio_id, + pos.symbol AS symbol, + pos.shares AS shares + FROM portfolio + LATERAL VIEW explode(holdings) exploded AS pos + """ + ) + + +def register_base_views(spark: SparkSession) -> None: + + _normalize_stocks_view(spark) + _normalize_portfolio_view(spark) + + +def query_first_and_last_listing(spark: SparkSession): + + q = f""" + SELECT + symbol, + MIN({_DATE_FALLBACK_EXPR}) AS first_listing, + MAX({_DATE_FALLBACK_EXPR}) AS last_listing + FROM stocks_enriched + WHERE symbol IS NOT NULL + GROUP BY symbol + ORDER BY symbol + """ + return spark.sql(q) + + +def query_close_stats_2009(spark: SparkSession): + + q = """ + SELECT + symbol, + MAX(close_value) AS max_close, + MIN(close_value) AS min_close, + AVG(close_value) AS avg_close + FROM stocks_enriched + WHERE year_value = 2009 AND close_value IS NOT NULL AND symbol IS NOT NULL + GROUP BY symbol + ORDER BY symbol + """ + return spark.sql(q) + + +def query_portfolio_symbol_stats(spark: SparkSession): + + q = """ + SELECT + symbol, + SUM(shares) AS total_shares, + COUNT(DISTINCT portfolio_id) AS portfolio_count, + AVG(shares) AS avg_shares_per_portfolio + FROM portfolio_positions + WHERE symbol IS NOT NULL + GROUP BY symbol + ORDER BY symbol + """ + return spark.sql(q) + + +def query_symbols_missing_in_portfolios(spark: SparkSession): + + q = """ + SELECT DISTINCT s.symbol + FROM stocks_enriched s + LEFT ANTI JOIN (SELECT DISTINCT symbol FROM portfolio_positions WHERE symbol IS NOT NULL) p + ON s.symbol = p.symbol + WHERE s.symbol IS NOT NULL + ORDER BY s.symbol + """ + return spark.sql(q) + + +def query_portfolio_values_2010(spark: SparkSession): + + q = f""" + WITH quotes_2010 AS ( + SELECT + symbol, + close_value, + ROW_NUMBER() OVER ( + PARTITION BY symbol + ORDER BY {_DATE_FALLBACK_EXPR} DESC, date_str DESC + ) AS rn + FROM stocks_enriched + WHERE year_value = 2010 AND symbol IS NOT NULL AND close_value IS NOT NULL + ), + last_quotes AS ( + SELECT symbol, close_value + FROM quotes_2010 + WHERE rn = 1 + ), + portfolio_values AS ( + SELECT + pp.portfolio_id, + SUM(pp.shares * lq.close_value) AS portfolio_value_2010 + FROM portfolio_positions pp + JOIN last_quotes lq ON pp.symbol = lq.symbol + GROUP BY pp.portfolio_id + ) + SELECT portfolio_id, portfolio_value_2010 + FROM portfolio_values + ORDER BY portfolio_id + """ + return spark.sql(q) + + +def main(scon, spark): + + register_base_views(spark) + + print("(a) Erste und letzte Notierung je Symbol:") + query_first_and_last_listing(spark).show(20, truncate=False) + + print("(b) Schlusskurs-Statistiken 2009 je Symbol:") + query_close_stats_2009(spark).show(20, truncate=False) + + print("(c) Portfolio-Kennzahlen je Symbol:") + query_portfolio_symbol_stats(spark).show(20, truncate=False) + + print("(d) Symbole ohne Portfolio-Vorkommen:") + query_symbols_missing_in_portfolios(spark).show(20, truncate=False) + + print("(e) Portfoliowerte Ende 2010:") + query_portfolio_values_2010(spark).show(20, truncate=False) + + +if __name__ == "__main__": + main(scon, spark) diff --git a/Aufgabe 12/sparkstart.py b/Aufgabe 12/sparkstart.py new file mode 100644 index 0000000..bdb7010 --- /dev/null +++ b/Aufgabe 12/sparkstart.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +""" +Erzeugen einer Spark-Konfiguration +""" + +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession + +# connect to cluster +conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin") +conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +conf.set("spark.executor.memory", '32g') +conf.set("spark.driver.memory", '8g') +conf.set("spark.cores.max", "40") +scon = SparkContext(conf=conf) + + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL") \ + .getOrCreate()