from sparkstart import scon, spark from pyspark.sql import SparkSession import matplotlib.pyplot as plt import pandas as pd HDFSPATH = "hdfs://193.174.205.250:54310/" HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/" def read_parquets(spark: SparkSession): stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" stations_df = spark.read.parquet(stations_path) stations_df.createOrReplaceTempView("german_stations") products_df = spark.read.parquet(products_path) products_df.createOrReplaceTempView("german_stations_data") stations_df.cache() products_df.cache() def task_11a_rollup(spark: SparkSession, station_name="Kempten"): print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---") start_time = time.time() # 1. Station ID finden # Case-insensitive search sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE lower(station_name) LIKE '%{station_name.lower()}%'") try: sid = sid_df.collect()[0]['stationId'] print(f"Station found: {station_name} -> ID {sid}") except IndexError: print(f"Station {station_name} nicht gefunden.") return # 2. Rollup Query vorbereiten # FIX: Parse string date 'YYYYMMDD' to real DATE object first q_prep = f""" SELECT YEAR(TO_DATE(date, 'yyyyMMdd')) as yr, QUARTER(TO_DATE(date, 'yyyyMMdd')) as qt, MONTH(TO_DATE(date, 'yyyyMMdd')) as mo, DAY(TO_DATE(date, 'yyyyMMdd')) as da, TT_TU FROM german_stations_data WHERE stationId = {sid} AND TT_TU IS NOT NULL AND TT_TU > -50 AND TT_TU < 60 """ spark.sql(q_prep).createOrReplaceTempView("data_prep") # 3. Rollup Execution # Note: We use string construction for quarters/months to ensure we get a valid date string for plotting q_rollup = """ SELECT yr, qt, mo, da, MIN(TT_TU) as min_temp, MAX(TT_TU) as max_temp, AVG(TT_TU) as avg_temp, -- Construct dates for plotting (handling the NULLs from ROLLUP) -- For Quarter: Use 1st month of quarter DATE(concat_ws('-', yr, cast(qt*3-2 as int), '01')) as qt_date, -- For Month: Use 1st day of month MAKE_DATE(yr, mo, 1) as mo_date, -- For Year: Use Jan 1st MAKE_DATE(yr, 1, 1) as yr_date, -- For Day: Use actual date MAKE_DATE(yr, mo, da) as da_date FROM data_prep GROUP BY ROLLUP(yr, qt, mo, da) """ df_rollup = spark.sql(q_rollup) df_rollup.cache() df_rollup.createOrReplaceTempView("station_rollup") # Trigger Action count = df_rollup.count() print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start_time:.2f}s") # --- PLOTTING --- # Plot 1: Tageswerte (letzte 3 Jahre) # Filter: All levels must be present (not null) q_days = """ SELECT da_date as date, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup WHERE yr IS NOT NULL) ORDER BY date """ pdf_days = spark.sql(q_days).toPandas() if pdf_days.empty: print("Warnung: Keine Daten für Tages-Plot gefunden.") else: plt.figure(1, figsize=(10, 5)) plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5) plt.title(f"{station_name}: Daily Average (Last 3 Years)") plt.xlabel('Date') plt.ylabel('Temp °C') plt.tight_layout() plt.show() # Plot 2: Monatswerte (10-20 Jahre) # Filter: Day is NULL (aggregation level), but Month is NOT NULL q_months = """ SELECT mo_date as date, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL) ORDER BY date """ pdf_months = spark.sql(q_months).toPandas() if not pdf_months.empty: plt.figure(2, figsize=(10, 5)) plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg') plt.title(f"{station_name}: Monthly Average (Last 20 Years)") plt.xlabel('Date') plt.ylabel('Temp °C') plt.tight_layout() plt.show() # Plot 3: Quartalswerte # Filter: Month is NULL, Quarter is NOT NULL q_quarters = """ SELECT qt_date as date, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL) ORDER BY date """ pdf_quarters = spark.sql(q_quarters).toPandas() if not pdf_quarters.empty: plt.figure(3, figsize=(10, 5)) plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg') plt.title(f"{station_name}: Quarterly Average (Last 20 Years)") plt.tight_layout() plt.show() # Plot 4: Jahreswerte # Filter: Quarter is NULL, Year is NOT NULL q_years = """ SELECT yr_date as date, min_temp, max_temp, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL) ORDER BY date """ pdf_years = spark.sql(q_years).toPandas() if not pdf_years.empty: plt.figure(4, figsize=(10, 5)) plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max') plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg') plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min') plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)") plt.legend() plt.tight_layout() plt.show() def task_11b_rank(spark: SparkSession): print("\n--- Aufgabe 11b: TempMonat Ranking ---") q_tempmonat = """ SELECT d.stationId, s.station_name, SUBSTR(CAST(d.date AS STRING), 1, 4) as year, SUBSTR(CAST(d.date AS STRING), 6, 2) as month, MIN(d.TT_TU) as min_t, MAX(d.TT_TU) as max_t, AVG(d.TT_TU) as avg_t FROM german_stations_data d JOIN german_stations s ON d.stationId = s.stationId WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50 GROUP BY d.stationId, s.station_name, year, month """ df_tm = spark.sql(q_tempmonat) df_tm.createOrReplaceTempView("tempmonat") # 1. Ranking Partitioniert nach Monat im Jahr 2015 print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...") q_rank_2015 = """ SELECT month, station_name, min_t, RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min, RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max, RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg FROM tempmonat WHERE year = '2015' ORDER BY rank_min, month """ spark.sql(q_rank_2015).show(10) # 2. Globales Ranking (über alle Monate/Jahre hinweg) print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...") q_rank_global = """ SELECT year, month, station_name, min_t, RANK() OVER (ORDER BY min_t ASC) as rank_min, RANK() OVER (ORDER BY max_t ASC) as rank_max, RANK() OVER (ORDER BY avg_t ASC) as rank_avg FROM tempmonat ORDER BY rank_min """ spark.sql(q_rank_global).show(10) print("11b: Fertig.") def task_11c_groupingsets(spark: SparkSession): print("\n--- Aufgabe 11c: Grouping Sets ---") q_prep = """ SELECT CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year, CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month, s.station_name, s.bundesland, d.TT_TU FROM german_stations_data d JOIN german_stations s ON d.stationId = s.stationId WHERE d.TT_TU > -50 """ spark.sql(q_prep).createOrReplaceTempView("gs_base") q_sets = """ SELECT year, month, bundesland, station_name, MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t FROM gs_base GROUP BY GROUPING SETS ( (year, bundesland), (year, station_name), (month, bundesland) ) """ df_gs = spark.sql(q_sets) df_gs.cache() df_gs.createOrReplaceTempView("grouping_result") # Action zum Cachen df_gs.count() print("Grouping Sets berechnet.") print("Auswahl 1: Jahr & Bundesland") spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5) print("Auswahl 2: Jahr & Station") spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5) print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)") spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5) def main(scon, spark): read_parquets(spark) # Aufgabe 11 task_11a_rollup(spark, station_name="Kempten") task_11b_rank(spark) task_11c_groupingsets(spark) if __name__ == '__main__': main(scon, spark)