from sparkstart import scon, spark from pyspark.sql import SparkSession import time import matplotlib.pyplot as plt import pandas as pd HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/" HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/" def init_view_stations(spark): """Lädt die Stationsdaten für Aufgabe 11""" s_path = HDFSPATH_STATIONS + "german_stations.parquet" d_path = HDFSPATH_STATIONS + "german_stations_data.parquet" spark.read.parquet(s_path).createOrReplaceTempView("german_stations") spark.read.parquet(d_path).createOrReplaceTempView("german_stations_data") def init_view_stocks(spark): """Lädt die Stocks-Daten für Aufgabe 12""" # Hinweis: Pfade anpassen, falls sie im Cluster anders liegen spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks") spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio") # --------------------------------------------------------- # AUFGABE 11 # --------------------------------------------------------- def task_11a_rollup(spark: SparkSession, station_name="Kempten"): print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---") start = time.time() # 1. Station ID finden sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE station_name LIKE '%{station_name}%'") try: sid = sid_df.collect()[0]['stationId'] except IndexError: print(f"Station {station_name} nicht gefunden.") return # 2. Rollup Query vorbereiten q_prep = f""" SELECT YEAR(date) as yr, QUARTER(date) as qt, MONTH(date) as mo, DAY(date) as da, TT_TU FROM german_stations_data WHERE stationId = {sid} AND TT_TU IS NOT NULL AND TT_TU > -50 """ spark.sql(q_prep).createOrReplaceTempView("data_prep") q_rollup = """ SELECT yr, qt, mo, da, MIN(TT_TU) as min_temp, MAX(TT_TU) as max_temp, AVG(TT_TU) as avg_temp, -- Datums-Konstruktion für Plots DATE(STRING(yr) || '-' || STRING(qt*3-2) || '-01') as qt_date, MAKE_DATE(yr, mo, 1) as mo_date, MAKE_DATE(yr, 1, 1) as yr_date, MAKE_DATE(yr, mo, da) as da_date FROM data_prep GROUP BY ROLLUP(yr, qt, mo, da) """ df_rollup = spark.sql(q_rollup) df_rollup.cache() df_rollup.createOrReplaceTempView("station_rollup") # Trigger Action for Cache & Time Measurement count = df_rollup.count() print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start:.2f}s") input(">> 11a: Check Spark UI (Stages/Storage) jetzt. Enter für Plots...") # Plot 1: Tageswerte (letzte 3 Jahre der Daten) q_days = """ SELECT da_date as date, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup) ORDER BY date """ pdf_days = spark.sql(q_days).toPandas() plt.figure(1, figsize=(10, 5)) plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5) plt.title(f"{station_name}: Daily Average (Last 3 Years)") plt.xlabel('Date') plt.ylabel('Temp °C') plt.tight_layout() plt.show() # Plot 2: Monatswerte (10-20 Jahre) q_months = """ SELECT mo_date as date, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) ORDER BY date """ pdf_months = spark.sql(q_months).toPandas() plt.figure(2, figsize=(10, 5)) plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg') plt.title(f"{station_name}: Monthly Average (Last 20 Years)") plt.xlabel('Date') plt.ylabel('Temp °C') plt.tight_layout() plt.show() # Plot 3: Quartalswerte q_quarters = """ SELECT qt_date as date, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) ORDER BY date """ pdf_quarters = spark.sql(q_quarters).toPandas() plt.figure(3, figsize=(10, 5)) plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg') plt.title(f"{station_name}: Quarterly Average (Last 20 Years)") plt.show() # Plot 4: Jahreswerte q_years = """ SELECT yr_date as date, min_temp, max_temp, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) ORDER BY date """ pdf_years = spark.sql(q_years).toPandas() plt.figure(4, figsize=(10, 5)) plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max') plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg') plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min') plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)") plt.legend() plt.show() def task_11b_rank(spark: SparkSession): print("\n--- Aufgabe 11b: TempMonat Ranking ---") start = time.time() q_tempmonat = """ SELECT d.stationId, s.station_name, SUBSTR(CAST(d.date AS STRING), 1, 4) as year, SUBSTR(CAST(d.date AS STRING), 6, 2) as month, MIN(d.TT_TU) as min_t, MAX(d.TT_TU) as max_t, AVG(d.TT_TU) as avg_t FROM german_stations_data d JOIN german_stations s ON d.stationId = s.stationId WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50 GROUP BY d.stationId, s.station_name, year, month """ df_tm = spark.sql(q_tempmonat) df_tm.createOrReplaceTempView("tempmonat") # 1. Ranking Partitioniert nach Monat im Jahr 2015 print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...") q_rank_2015 = """ SELECT month, station_name, min_t, RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min, RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max, RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg FROM tempmonat WHERE year = '2015' ORDER BY rank_min, month """ spark.sql(q_rank_2015).show(10) # 2. Globales Ranking (über alle Monate/Jahre hinweg) print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...") q_rank_global = """ SELECT year, month, station_name, min_t, RANK() OVER (ORDER BY min_t ASC) as rank_min, RANK() OVER (ORDER BY max_t ASC) as rank_max, RANK() OVER (ORDER BY avg_t ASC) as rank_avg FROM tempmonat ORDER BY rank_min """ spark.sql(q_rank_global).show(10) print(f"Dauer 11b: {time.time() - start:.2f}s") input(">> 11b: Check Spark UI (Jobs/Stages). Enter...") def task_11c_groupingsets(spark: SparkSession): print("\n--- Aufgabe 11c: Grouping Sets ---") start = time.time() q_prep = """ SELECT CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year, CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month, s.station_name, s.bundesland, d.TT_TU FROM german_stations_data d JOIN german_stations s ON d.stationId = s.stationId WHERE d.TT_TU > -50 """ spark.sql(q_prep).createOrReplaceTempView("gs_base") q_sets = """ SELECT year, month, bundesland, station_name, MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t FROM gs_base GROUP BY GROUPING SETS ( (year, bundesland), (year, station_name), (month, bundesland) ) """ df_gs = spark.sql(q_sets) df_gs.cache() df_gs.createOrReplaceTempView("grouping_result") # Action zum Cachen df_gs.count() print(f"Grouping Sets berechnet. Dauer: {time.time() - start:.2f}s") print("Auswahl 1: Jahr & Bundesland") spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5) print("Auswahl 2: Jahr & Station") spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5) print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)") spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5) input(">> 11c: Check Spark UI (Zugriffspläne/Storage). Enter...") # --------------------------------------------------------- # AUFGABE 12 # --------------------------------------------------------- def task_12_stocks_analysis(spark: SparkSession): print("\n--- Aufgabe 12: Stocks & Portfolio ---") # a) Erstes und letztes Datum je Symbol print("a) Min/Max Datum pro Symbol") t0 = time.time() q_a = """ SELECT symbol, MIN(date) as first_date, MAX(date) as last_date FROM stocks GROUP BY symbol ORDER BY symbol """ spark.sql(q_a).show(5) print(f"Zeit a): {time.time()-t0:.2f}s") # b) Aggregationen 2009 print("\nb) High/Low/Avg Close 2009") t0 = time.time() q_b = """ SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close FROM stocks WHERE YEAR(date) = 2009 GROUP BY symbol ORDER BY symbol """ spark.sql(q_b).show(5) print(f"Zeit b): {time.time()-t0:.2f}s") # c) Lateral View (Explode Portfolio) print("\nc) Lateral View: Aktien in Portfolios") t0 = time.time() q_c = """ SELECT h.symbol, SUM(h.amount) as total_shares, COUNT(p.portfolioId) as num_portfolios, AVG(h.amount) as avg_per_portfolio FROM portfolio p LATERAL VIEW explode(holdings) t AS h GROUP BY h.symbol ORDER BY h.symbol """ spark.sql(q_c).show(5) print(f"Zeit c): {time.time()-t0:.2f}s") # d) Symbole in keinem Portfolio (Anti Join) print("\nd) Symbole ohne Portfolio") t0 = time.time() q_d = """ SELECT DISTINCT s.symbol FROM stocks s LEFT ANTI JOIN ( SELECT DISTINCT h.symbol FROM portfolio p LATERAL VIEW explode(holdings) t AS h ) p_sym ON s.symbol = p_sym.symbol ORDER BY s.symbol """ spark.sql(q_d).show(5) print(f"Zeit d): {time.time()-t0:.2f}s") input(">> 12 a-d fertig. Check UI. Enter für e)...") # e) Portfolio Wert Ende 2010 print("\ne) Portfolio Bewertung Ende 2010") t0 = time.time() q_last_price = """ SELECT symbol, close FROM ( SELECT symbol, close, ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn FROM stocks WHERE YEAR(date) = 2010 ) tmp WHERE rn = 1 """ spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end") # Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren q_val = """ SELECT p.portfolioId, SUM(h.amount * s.close) as portfolio_value_2010 FROM portfolio p LATERAL VIEW explode(holdings) t AS h JOIN stocks_2010_end s ON h.symbol = s.symbol GROUP BY p.portfolioId ORDER BY p.portfolioId """ spark.sql(q_val).show(5) print(f"Zeit e): {time.time()-t0:.2f}s") def main(scon, spark): init_view_stations(spark) # Aufgabe 11 task_11a_rollup(spark, station_name="Kempten") task_11b_rank(spark) task_11c_groupingsets(spark) # Aufgabe 12 init_view_stocks(spark) task_12_stocks_analysis(spark) if __name__ == '__main__': main(scon, spark)