mirror of
https://github.com/Vale54321/BigData.git
synced 2025-12-15 19:29:34 +01:00
Aufgabe 10
This commit is contained in:
@@ -59,101 +59,96 @@ def duration_circle_size(spark: SparkSession):
|
|||||||
|
|
||||||
|
|
||||||
def compute_daily_and_yearly_frosts(spark: SparkSession):
|
def compute_daily_and_yearly_frosts(spark: SparkSession):
|
||||||
q_daily_max = (
|
q_daily_max = (
|
||||||
"SELECT stationId, date, SUBSTR(CAST(date AS STRING),1,4) AS year, MAX(TT_TU) AS max_temp "
|
"SELECT stationId, date, SUBSTR(CAST(date AS STRING),1,4) AS year, MAX(TT_TU) AS max_temp "
|
||||||
"FROM german_stations_data "
|
"FROM german_stations_data "
|
||||||
"WHERE TT_TU IS NOT NULL "
|
"WHERE TT_TU IS NOT NULL AND TT_TU > -50 AND TT_TU < 60 "
|
||||||
"GROUP BY stationId, date"
|
"GROUP BY stationId, date"
|
||||||
)
|
)
|
||||||
daily_max = spark.sql(q_daily_max)
|
daily_max = spark.sql(q_daily_max)
|
||||||
daily_max.createOrReplaceTempView('daily_max')
|
daily_max.createOrReplaceTempView('daily_max')
|
||||||
|
|
||||||
# mark a day as frost if max_temp < 0
|
# mark a day as frost if max_temp < 0
|
||||||
q_daily_frost = (
|
q_daily_frost = (
|
||||||
"SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost "
|
"SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost "
|
||||||
"FROM daily_max"
|
"FROM daily_max"
|
||||||
)
|
)
|
||||||
daily_frost = spark.sql(q_daily_frost)
|
daily_frost = spark.sql(q_daily_frost)
|
||||||
daily_frost.createOrReplaceTempView('daily_frost')
|
daily_frost.createOrReplaceTempView('daily_frost')
|
||||||
|
|
||||||
# yearly frostdays per station
|
# yearly frostdays per station
|
||||||
q_station_year = (
|
q_station_year = (
|
||||||
"SELECT stationId, year, SUM(is_frost) AS frost_days "
|
"SELECT stationId, year, SUM(is_frost) AS frost_days "
|
||||||
"FROM daily_frost GROUP BY stationId, year"
|
"FROM daily_frost GROUP BY stationId, year"
|
||||||
)
|
)
|
||||||
station_year_frost = spark.sql(q_station_year)
|
station_year_frost = spark.sql(q_station_year)
|
||||||
station_year_frost.createOrReplaceTempView('station_year_frost')
|
station_year_frost.createOrReplaceTempView('station_year_frost')
|
||||||
|
|
||||||
|
|
||||||
def frost_analysis(spark: SparkSession, year=2024, station_name_matches=('kempten',)):
|
def frost_analysis(spark: SparkSession, year=2024, station_name_matches=('kempten',)):
|
||||||
compute_daily_and_yearly_frosts(spark)
|
compute_daily_and_yearly_frosts(spark)
|
||||||
|
|
||||||
# Debug: check available years and data
|
q_hist = (
|
||||||
spark.sql("SELECT year, COUNT(*) as cnt FROM station_year_frost GROUP BY year ORDER BY year").show(50)
|
f"SELECT frost_days, COUNT(*) AS station_count "
|
||||||
|
f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days"
|
||||||
|
)
|
||||||
|
hist_df = spark.sql(q_hist)
|
||||||
|
|
||||||
q_hist = (
|
hist_pdf = hist_df.toPandas()
|
||||||
f"SELECT frost_days, COUNT(*) AS station_count "
|
|
||||||
f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days"
|
|
||||||
)
|
|
||||||
hist_df = spark.sql(q_hist)
|
|
||||||
|
|
||||||
hist_pdf = hist_df.toPandas()
|
if hist_pdf.empty:
|
||||||
|
print(f"No frost data found for year {year}. Trying to find available years...")
|
||||||
|
q_all = "SELECT frost_days, COUNT(*) AS station_count FROM station_year_frost GROUP BY frost_days ORDER BY frost_days"
|
||||||
|
hist_pdf = spark.sql(q_all).toPandas()
|
||||||
|
if hist_pdf.empty:
|
||||||
|
print("No frost data available at all. Check if TT_TU column contains valid temperature data.")
|
||||||
|
return
|
||||||
|
print(f"Found {len(hist_pdf)} frost day categories across all years")
|
||||||
|
|
||||||
if hist_pdf.empty:
|
plt.figure(figsize=(8, 5))
|
||||||
print(f"No frost data found for year {year}. Trying to find available years...")
|
plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue')
|
||||||
# Try without year filter to see if data exists
|
plt.xlabel('Number of Frost Days in year ' + str(year))
|
||||||
q_all = "SELECT frost_days, COUNT(*) AS station_count FROM station_year_frost GROUP BY frost_days ORDER BY frost_days"
|
plt.ylabel('Number of Stations')
|
||||||
hist_pdf = spark.sql(q_all).toPandas()
|
plt.title(f'Stations vs Frost Days ({year})')
|
||||||
if hist_pdf.empty:
|
plt.tight_layout()
|
||||||
print("No frost data available at all. Check if TT_TU column contains valid temperature data.")
|
plt.show()
|
||||||
return
|
|
||||||
print(f"Found {len(hist_pdf)} frost day categories across all years")
|
|
||||||
|
|
||||||
plt.figure(figsize=(8, 5))
|
for name in station_name_matches:
|
||||||
plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue')
|
q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'"
|
||||||
plt.xlabel('Number of Frost Days in year ' + str(year))
|
ids_df = spark.sql(q_find)
|
||||||
plt.ylabel('Number of Stations')
|
ids = ids_df.collect()
|
||||||
plt.title(f'Stations vs Frost Days ({year})')
|
if not ids:
|
||||||
plt.tight_layout()
|
print(f"No stations found matching '{name}'")
|
||||||
plt.show()
|
continue
|
||||||
|
for r in ids:
|
||||||
|
sid = r['stationId']
|
||||||
|
sname = r['station_name']
|
||||||
|
print(f"Analyzing stationId={sid} name={sname}")
|
||||||
|
|
||||||
for name in station_name_matches:
|
q_ts = (
|
||||||
q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'"
|
"SELECT year, frost_days, "
|
||||||
ids_df = spark.sql(q_find)
|
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, "
|
||||||
ids = ids_df.collect()
|
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 "
|
||||||
if not ids:
|
f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)"
|
||||||
print(f"No stations found matching '{name}'")
|
)
|
||||||
continue
|
ts_df = spark.sql(q_ts)
|
||||||
for r in ids:
|
|
||||||
sid = r['stationId']
|
|
||||||
sname = r['station_name']
|
|
||||||
print(f"Analyzing stationId={sid} name={sname}")
|
|
||||||
|
|
||||||
# compute frostdays + 5-yr and 20-yr rolling averages using window frame
|
pdf = ts_df.toPandas()
|
||||||
q_ts = (
|
if pdf.empty:
|
||||||
"SELECT year, frost_days, "
|
print(f"No yearly frost data for station {sid}")
|
||||||
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, "
|
continue
|
||||||
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 "
|
|
||||||
f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)"
|
|
||||||
)
|
|
||||||
ts_df = spark.sql(q_ts)
|
|
||||||
|
|
||||||
pdf = ts_df.toPandas()
|
pdf['year'] = pdf['year'].astype(int)
|
||||||
if pdf.empty:
|
plt.figure(figsize=(10, 5))
|
||||||
print(f"No yearly frost data for station {sid}")
|
plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o')
|
||||||
continue
|
plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--')
|
||||||
|
plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':')
|
||||||
pdf['year'] = pdf['year'].astype(int)
|
plt.xlabel('Year')
|
||||||
plt.figure(figsize=(10, 5))
|
plt.ylabel('Frost Days')
|
||||||
plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o')
|
plt.title(f'Frost Days over Years for {sname} (station {sid})')
|
||||||
plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--')
|
plt.legend()
|
||||||
plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':')
|
plt.tight_layout()
|
||||||
plt.xlabel('Year')
|
plt.show()
|
||||||
plt.ylabel('Frost Days')
|
|
||||||
plt.title(f'Frost Days over Years for {sname} (station {sid})')
|
|
||||||
plt.legend()
|
|
||||||
plt.tight_layout()
|
|
||||||
plt.show()
|
|
||||||
|
|
||||||
|
|
||||||
def height_frost_correlation(spark: SparkSession):
|
def height_frost_correlation(spark: SparkSession):
|
||||||
|
|||||||
Reference in New Issue
Block a user