From f89d39d42039212709300494f6ef92c9aeffa752 Mon Sep 17 00:00:00 2001 From: Valentin Heiserer Date: Thu, 11 Dec 2025 20:39:21 +0100 Subject: [PATCH] Aufgabe 10 --- Aufgabe 10/Aufgabe10.py | 163 +++++++++++++++++++--------------------- 1 file changed, 79 insertions(+), 84 deletions(-) diff --git a/Aufgabe 10/Aufgabe10.py b/Aufgabe 10/Aufgabe10.py index 2dd9ced..0aa3b52 100644 --- a/Aufgabe 10/Aufgabe10.py +++ b/Aufgabe 10/Aufgabe10.py @@ -59,101 +59,96 @@ def duration_circle_size(spark: SparkSession): def compute_daily_and_yearly_frosts(spark: SparkSession): - q_daily_max = ( - "SELECT stationId, date, SUBSTR(CAST(date AS STRING),1,4) AS year, MAX(TT_TU) AS max_temp " - "FROM german_stations_data " - "WHERE TT_TU IS NOT NULL " - "GROUP BY stationId, date" - ) - daily_max = spark.sql(q_daily_max) - daily_max.createOrReplaceTempView('daily_max') + q_daily_max = ( + "SELECT stationId, date, SUBSTR(CAST(date AS STRING),1,4) AS year, MAX(TT_TU) AS max_temp " + "FROM german_stations_data " + "WHERE TT_TU IS NOT NULL AND TT_TU > -50 AND TT_TU < 60 " + "GROUP BY stationId, date" + ) + daily_max = spark.sql(q_daily_max) + daily_max.createOrReplaceTempView('daily_max') - # mark a day as frost if max_temp < 0 - q_daily_frost = ( - "SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost " - "FROM daily_max" - ) - daily_frost = spark.sql(q_daily_frost) - daily_frost.createOrReplaceTempView('daily_frost') + # mark a day as frost if max_temp < 0 + q_daily_frost = ( + "SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost " + "FROM daily_max" + ) + daily_frost = spark.sql(q_daily_frost) + daily_frost.createOrReplaceTempView('daily_frost') - # yearly frostdays per station - q_station_year = ( - "SELECT stationId, year, SUM(is_frost) AS frost_days " - "FROM daily_frost GROUP BY stationId, year" - ) - station_year_frost = spark.sql(q_station_year) - station_year_frost.createOrReplaceTempView('station_year_frost') + # yearly frostdays per station + q_station_year = ( + "SELECT stationId, year, SUM(is_frost) AS frost_days " + "FROM daily_frost GROUP BY stationId, year" + ) + station_year_frost = spark.sql(q_station_year) + station_year_frost.createOrReplaceTempView('station_year_frost') def frost_analysis(spark: SparkSession, year=2024, station_name_matches=('kempten',)): - compute_daily_and_yearly_frosts(spark) + compute_daily_and_yearly_frosts(spark) - # Debug: check available years and data - spark.sql("SELECT year, COUNT(*) as cnt FROM station_year_frost GROUP BY year ORDER BY year").show(50) + q_hist = ( + f"SELECT frost_days, COUNT(*) AS station_count " + f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days" + ) + hist_df = spark.sql(q_hist) - q_hist = ( - f"SELECT frost_days, COUNT(*) AS station_count " - f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days" - ) - hist_df = spark.sql(q_hist) + hist_pdf = hist_df.toPandas() + + if hist_pdf.empty: + print(f"No frost data found for year {year}. Trying to find available years...") + q_all = "SELECT frost_days, COUNT(*) AS station_count FROM station_year_frost GROUP BY frost_days ORDER BY frost_days" + hist_pdf = spark.sql(q_all).toPandas() + if hist_pdf.empty: + print("No frost data available at all. Check if TT_TU column contains valid temperature data.") + return + print(f"Found {len(hist_pdf)} frost day categories across all years") + + plt.figure(figsize=(8, 5)) + plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue') + plt.xlabel('Number of Frost Days in year ' + str(year)) + plt.ylabel('Number of Stations') + plt.title(f'Stations vs Frost Days ({year})') + plt.tight_layout() + plt.show() - hist_pdf = hist_df.toPandas() - - if hist_pdf.empty: - print(f"No frost data found for year {year}. Trying to find available years...") - # Try without year filter to see if data exists - q_all = "SELECT frost_days, COUNT(*) AS station_count FROM station_year_frost GROUP BY frost_days ORDER BY frost_days" - hist_pdf = spark.sql(q_all).toPandas() - if hist_pdf.empty: - print("No frost data available at all. Check if TT_TU column contains valid temperature data.") - return - print(f"Found {len(hist_pdf)} frost day categories across all years") - - plt.figure(figsize=(8, 5)) - plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue') - plt.xlabel('Number of Frost Days in year ' + str(year)) - plt.ylabel('Number of Stations') - plt.title(f'Stations vs Frost Days ({year})') - plt.tight_layout() - plt.show() + for name in station_name_matches: + q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'" + ids_df = spark.sql(q_find) + ids = ids_df.collect() + if not ids: + print(f"No stations found matching '{name}'") + continue + for r in ids: + sid = r['stationId'] + sname = r['station_name'] + print(f"Analyzing stationId={sid} name={sname}") - for name in station_name_matches: - q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'" - ids_df = spark.sql(q_find) - ids = ids_df.collect() - if not ids: - print(f"No stations found matching '{name}'") - continue - for r in ids: - sid = r['stationId'] - sname = r['station_name'] - print(f"Analyzing stationId={sid} name={sname}") + q_ts = ( + "SELECT year, frost_days, " + "AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, " + "AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 " + f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)" + ) + ts_df = spark.sql(q_ts) - # compute frostdays + 5-yr and 20-yr rolling averages using window frame - q_ts = ( - "SELECT year, frost_days, " - "AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, " - "AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 " - f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)" - ) - ts_df = spark.sql(q_ts) + pdf = ts_df.toPandas() + if pdf.empty: + print(f"No yearly frost data for station {sid}") + continue - pdf = ts_df.toPandas() - if pdf.empty: - print(f"No yearly frost data for station {sid}") - continue - - pdf['year'] = pdf['year'].astype(int) - plt.figure(figsize=(10, 5)) - plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o') - plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--') - plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':') - plt.xlabel('Year') - plt.ylabel('Frost Days') - plt.title(f'Frost Days over Years for {sname} (station {sid})') - plt.legend() - plt.tight_layout() - plt.show() + pdf['year'] = pdf['year'].astype(int) + plt.figure(figsize=(10, 5)) + plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o') + plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--') + plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':') + plt.xlabel('Year') + plt.ylabel('Frost Days') + plt.title(f'Frost Days over Years for {sname} (station {sid})') + plt.legend() + plt.tight_layout() + plt.show() def height_frost_correlation(spark: SparkSession):