From 2b2618864774443fd8d1491c6c0dd22e5405b2e9 Mon Sep 17 00:00:00 2001 From: Valentin Heiserer Date: Thu, 11 Dec 2025 20:46:38 +0100 Subject: [PATCH] Aufgabe11 --- Aufgabe 11/Aufgabe11.py | 366 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 366 insertions(+) create mode 100644 Aufgabe 11/Aufgabe11.py diff --git a/Aufgabe 11/Aufgabe11.py b/Aufgabe 11/Aufgabe11.py new file mode 100644 index 0000000..82631cf --- /dev/null +++ b/Aufgabe 11/Aufgabe11.py @@ -0,0 +1,366 @@ +from sparkstart import scon, spark +from pyspark.sql import SparkSession +import time +import matplotlib.pyplot as plt +import pandas as pd + +HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/" +HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/" + +def init_view_stations(spark): + """Lädt die Stationsdaten für Aufgabe 11""" + s_path = HDFSPATH_STATIONS + "german_stations.parquet" + d_path = HDFSPATH_STATIONS + "german_stations_data.parquet" + + spark.read.parquet(s_path).createOrReplaceTempView("german_stations") + spark.read.parquet(d_path).createOrReplaceTempView("german_stations_data") + +def init_view_stocks(spark): + """Lädt die Stocks-Daten für Aufgabe 12""" + # Hinweis: Pfade anpassen, falls sie im Cluster anders liegen + spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks") + spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio") + +# --------------------------------------------------------- +# AUFGABE 11 +# --------------------------------------------------------- + +def task_11a_rollup(spark: SparkSession, station_name="Kempten"): + print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---") + start = time.time() + + # 1. Station ID finden + sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE station_name LIKE '%{station_name}%'") + try: + sid = sid_df.collect()[0]['stationId'] + except IndexError: + print(f"Station {station_name} nicht gefunden.") + return + + # 2. Rollup Query vorbereiten + q_prep = f""" + SELECT + YEAR(date) as yr, + QUARTER(date) as qt, + MONTH(date) as mo, + DAY(date) as da, + TT_TU + FROM german_stations_data + WHERE stationId = {sid} AND TT_TU IS NOT NULL AND TT_TU > -50 + """ + spark.sql(q_prep).createOrReplaceTempView("data_prep") + + q_rollup = """ + SELECT + yr, qt, mo, da, + MIN(TT_TU) as min_temp, + MAX(TT_TU) as max_temp, + AVG(TT_TU) as avg_temp, + + -- Datums-Konstruktion für Plots + DATE(STRING(yr) || '-' || STRING(qt*3-2) || '-01') as qt_date, + MAKE_DATE(yr, mo, 1) as mo_date, + MAKE_DATE(yr, 1, 1) as yr_date, + MAKE_DATE(yr, mo, da) as da_date + + FROM data_prep + GROUP BY ROLLUP(yr, qt, mo, da) + """ + + df_rollup = spark.sql(q_rollup) + df_rollup.cache() + df_rollup.createOrReplaceTempView("station_rollup") + + # Trigger Action for Cache & Time Measurement + count = df_rollup.count() + print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start:.2f}s") + input(">> 11a: Check Spark UI (Stages/Storage) jetzt. Enter für Plots...") + + # Plot 1: Tageswerte (letzte 3 Jahre der Daten) + q_days = """ + SELECT da_date as date, avg_temp + FROM station_rollup + WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL + AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup) + ORDER BY date + """ + pdf_days = spark.sql(q_days).toPandas() + + plt.figure(1, figsize=(10, 5)) + plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5) + plt.title(f"{station_name}: Daily Average (Last 3 Years)") + plt.xlabel('Date') + plt.ylabel('Temp °C') + plt.tight_layout() + plt.show() + + # Plot 2: Monatswerte (10-20 Jahre) + q_months = """ + SELECT mo_date as date, avg_temp + FROM station_rollup + WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL + AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) + ORDER BY date + """ + pdf_months = spark.sql(q_months).toPandas() + + plt.figure(2, figsize=(10, 5)) + plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg') + plt.title(f"{station_name}: Monthly Average (Last 20 Years)") + plt.xlabel('Date') + plt.ylabel('Temp °C') + plt.tight_layout() + plt.show() + + # Plot 3: Quartalswerte + q_quarters = """ + SELECT qt_date as date, avg_temp + FROM station_rollup + WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL + AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) + ORDER BY date + """ + pdf_quarters = spark.sql(q_quarters).toPandas() + + plt.figure(3, figsize=(10, 5)) + plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg') + plt.title(f"{station_name}: Quarterly Average (Last 20 Years)") + plt.show() + + # Plot 4: Jahreswerte + q_years = """ + SELECT yr_date as date, min_temp, max_temp, avg_temp + FROM station_rollup + WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL + AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) + ORDER BY date + """ + pdf_years = spark.sql(q_years).toPandas() + + plt.figure(4, figsize=(10, 5)) + plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max') + plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg') + plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min') + plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)") + plt.legend() + plt.show() + + +def task_11b_rank(spark: SparkSession): + print("\n--- Aufgabe 11b: TempMonat Ranking ---") + start = time.time() + + q_tempmonat = """ + SELECT + d.stationId, + s.station_name, + SUBSTR(CAST(d.date AS STRING), 1, 4) as year, + SUBSTR(CAST(d.date AS STRING), 6, 2) as month, + MIN(d.TT_TU) as min_t, + MAX(d.TT_TU) as max_t, + AVG(d.TT_TU) as avg_t + FROM german_stations_data d + JOIN german_stations s ON d.stationId = s.stationId + WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50 + GROUP BY d.stationId, s.station_name, year, month + """ + df_tm = spark.sql(q_tempmonat) + df_tm.createOrReplaceTempView("tempmonat") + + # 1. Ranking Partitioniert nach Monat im Jahr 2015 + print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...") + q_rank_2015 = """ + SELECT + month, station_name, min_t, + RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min, + RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max, + RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg + FROM tempmonat + WHERE year = '2015' + ORDER BY rank_min, month + """ + spark.sql(q_rank_2015).show(10) + + # 2. Globales Ranking (über alle Monate/Jahre hinweg) + print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...") + q_rank_global = """ + SELECT + year, month, station_name, min_t, + RANK() OVER (ORDER BY min_t ASC) as rank_min, + RANK() OVER (ORDER BY max_t ASC) as rank_max, + RANK() OVER (ORDER BY avg_t ASC) as rank_avg + FROM tempmonat + ORDER BY rank_min + """ + spark.sql(q_rank_global).show(10) + + print(f"Dauer 11b: {time.time() - start:.2f}s") + input(">> 11b: Check Spark UI (Jobs/Stages). Enter...") + + +def task_11c_groupingsets(spark: SparkSession): + print("\n--- Aufgabe 11c: Grouping Sets ---") + start = time.time() + + q_prep = """ + SELECT + CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year, + CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month, + s.station_name, + s.bundesland, + d.TT_TU + FROM german_stations_data d + JOIN german_stations s ON d.stationId = s.stationId + WHERE d.TT_TU > -50 + """ + spark.sql(q_prep).createOrReplaceTempView("gs_base") + + q_sets = """ + SELECT + year, + month, + bundesland, + station_name, + MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t + FROM gs_base + GROUP BY GROUPING SETS ( + (year, bundesland), + (year, station_name), + (month, bundesland) + ) + """ + df_gs = spark.sql(q_sets) + df_gs.cache() + df_gs.createOrReplaceTempView("grouping_result") + + # Action zum Cachen + df_gs.count() + print(f"Grouping Sets berechnet. Dauer: {time.time() - start:.2f}s") + + print("Auswahl 1: Jahr & Bundesland") + spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5) + + print("Auswahl 2: Jahr & Station") + spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5) + + print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)") + spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5) + + input(">> 11c: Check Spark UI (Zugriffspläne/Storage). Enter...") + + +# --------------------------------------------------------- +# AUFGABE 12 +# --------------------------------------------------------- + +def task_12_stocks_analysis(spark: SparkSession): + print("\n--- Aufgabe 12: Stocks & Portfolio ---") + + # a) Erstes und letztes Datum je Symbol + print("a) Min/Max Datum pro Symbol") + t0 = time.time() + q_a = """ + SELECT symbol, MIN(date) as first_date, MAX(date) as last_date + FROM stocks + GROUP BY symbol + ORDER BY symbol + """ + spark.sql(q_a).show(5) + print(f"Zeit a): {time.time()-t0:.2f}s") + + # b) Aggregationen 2009 + print("\nb) High/Low/Avg Close 2009") + t0 = time.time() + q_b = """ + SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close + FROM stocks + WHERE YEAR(date) = 2009 + GROUP BY symbol + ORDER BY symbol + """ + spark.sql(q_b).show(5) + print(f"Zeit b): {time.time()-t0:.2f}s") + + # c) Lateral View (Explode Portfolio) + print("\nc) Lateral View: Aktien in Portfolios") + t0 = time.time() + + q_c = """ + SELECT + h.symbol, + SUM(h.amount) as total_shares, + COUNT(p.portfolioId) as num_portfolios, + AVG(h.amount) as avg_per_portfolio + FROM portfolio p + LATERAL VIEW explode(holdings) t AS h + GROUP BY h.symbol + ORDER BY h.symbol + """ + spark.sql(q_c).show(5) + print(f"Zeit c): {time.time()-t0:.2f}s") + + # d) Symbole in keinem Portfolio (Anti Join) + print("\nd) Symbole ohne Portfolio") + t0 = time.time() + q_d = """ + SELECT DISTINCT s.symbol + FROM stocks s + LEFT ANTI JOIN ( + SELECT DISTINCT h.symbol + FROM portfolio p + LATERAL VIEW explode(holdings) t AS h + ) p_sym ON s.symbol = p_sym.symbol + ORDER BY s.symbol + """ + spark.sql(q_d).show(5) + print(f"Zeit d): {time.time()-t0:.2f}s") + + input(">> 12 a-d fertig. Check UI. Enter für e)...") + + # e) Portfolio Wert Ende 2010 + print("\ne) Portfolio Bewertung Ende 2010") + t0 = time.time() + + q_last_price = """ + SELECT symbol, close + FROM ( + SELECT + symbol, + close, + ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn + FROM stocks + WHERE YEAR(date) = 2010 + ) tmp + WHERE rn = 1 + """ + spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end") + + # Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren + q_val = """ + SELECT + p.portfolioId, + SUM(h.amount * s.close) as portfolio_value_2010 + FROM portfolio p + LATERAL VIEW explode(holdings) t AS h + JOIN stocks_2010_end s ON h.symbol = s.symbol + GROUP BY p.portfolioId + ORDER BY p.portfolioId + """ + spark.sql(q_val).show(5) + print(f"Zeit e): {time.time()-t0:.2f}s") + + +def main(scon, spark): + init_view_stations(spark) + + # Aufgabe 11 + task_11a_rollup(spark, station_name="Kempten") + task_11b_rank(spark) + task_11c_groupingsets(spark) + + # Aufgabe 12 + init_view_stocks(spark) + task_12_stocks_analysis(spark) + +if __name__ == '__main__': + main(scon, spark) \ No newline at end of file