Files
BigData/Aufgabe 8/main.py
2025-11-28 13:23:06 +01:00

317 lines
9.0 KiB
Python

from sparkstart import scon, spark
import ghcnd_stations
import matplotlib.pyplot as plt
# a) Liste aller Stationen sortiert nach Stationsname
def get_all_stations():
result = spark.sql("""
SELECT *
FROM ghcndstations
ORDER BY stationname
""")
result.show(truncate=False)
# b) Anzahl der Stationen je Land
def get_station_count_per_country():
result = spark.sql("""
SELECT
c.countrycode,
c.countryname,
COUNT(s.stationid) AS count
FROM ghcndstations s
JOIN ghcndcountries c
ON s.countrycode = c.countrycode
GROUP BY
c.countrycode,
c.countryname
ORDER BY count DESC
""")
result.show(truncate=False)
# c) Stationen in Deutschland
def get_german_stations():
result = spark.sql("""
SELECT *
FROM ghcndstations
WHERE countrycode = 'GM'
ORDER BY stationname
""")
result.show(truncate=False)
# d) Plot TMAX und TMIN für Station und Jahr
def plot_temp_day(station_name, year):
df_filtered = spark.sql(f"""
SELECT
d.date,
d.value / 10.0 AS temp,
d.element
FROM ghcnddata d
JOIN ghcndstations s
ON d.stationid = s.stationid
WHERE
trim(upper(s.stationname)) = '{station_name.upper()}'
AND year(d.date) = {year}
AND d.element IN ('TMAX', 'TMIN')
ORDER BY d.date
""").collect()
if not df_filtered:
print(f"Keine Daten für Station '{station_name}' im Jahr {year} (oder Station nicht gefunden).")
return
# Daten in Dicts organisieren
tmax_data = {row['date']: row['temp'] for row in df_filtered if row['element'] == 'TMAX'}
tmin_data = {row['date']: row['temp'] for row in df_filtered if row['element'] == 'TMIN'}
# Sortieren nach Datum
dates = sorted(set(tmax_data.keys()) | set(tmin_data.keys()))
tmax_vals = [tmax_data.get(d, None) for d in dates]
tmin_vals = [tmin_data.get(d, None) for d in dates]
day_of_year = [d.timetuple().tm_yday for d in dates]
plt.plot(day_of_year, tmax_vals, 'r', label='TMAX')
plt.plot(day_of_year, tmin_vals, 'b', label='TMIN')
plt.xlabel('Tag des Jahres')
plt.ylabel('Temperatur (°C)')
plt.title(f'{station_name} {year}')
plt.legend()
plt.show()
# e) Gesamt-Niederschlag pro Jahr für Station
def plot_precip_year(station_name):
df_precip = spark.sql(f"""
SELECT
year(d.date) AS year,
SUM(d.value) / 10.0 AS total_precip
FROM ghcnddata d
JOIN ghcndstations s
ON d.stationid = s.stationid
WHERE
trim(upper(s.stationname)) = '{station_name.upper()}'
AND d.element = 'PRCP'
GROUP BY year(d.date)
ORDER BY year
""").collect()
if not df_precip:
print(f"Keine Niederschlagsdaten für Station '{station_name}'.")
return
years = [row['year'] for row in df_precip]
total_precip = [row['total_precip'] for row in df_precip]
plt.bar(years, total_precip)
plt.xlabel('Jahr')
plt.ylabel('Niederschlag (mm)')
plt.title(f'Gesamt-Niederschlag {station_name}')
plt.show()
# f) Durchschnitt TMAX pro Tag des Jahres, mit 21-Tage Durchschnitt
def plot_avg_tmax_day(station_name):
df_avg = spark.sql(f"""
SELECT
dayofyear(d.date) AS day,
AVG(d.value) / 10.0 AS avg_tmax
FROM ghcnddata d
JOIN ghcndstations s
ON d.stationid = s.stationid
WHERE
trim(upper(s.stationname)) = '{station_name.upper()}'
AND d.element = 'TMAX'
GROUP BY dayofyear(d.date)
ORDER BY day
""").collect()
if not df_avg:
print(f"Keine TMAX-Daten für Station '{station_name}'.")
return
days = [row['day'] for row in df_avg]
avg_tmax = [row['avg_tmax'] for row in df_avg]
#TODO: Mit SQL machen
# 21-Tage gleitender Durchschnitt (10 Tage davor, Tag selbst, 10 Tage danach)
rolling_avg = []
for i in range(len(avg_tmax)):
start = max(0, i - 10)
end = min(len(avg_tmax), i + 11)
rolling_avg.append(sum(avg_tmax[start:end]) / (end - start))
plt.plot(days, avg_tmax, label='Täglich')
plt.plot(days, rolling_avg, label='21-Tage')
plt.xlabel('Tag des Jahres')
plt.ylabel('Durchschnitt TMAX (°C)')
plt.title(f'Durchschnitt TMAX {station_name}')
plt.legend()
plt.show()
# g) Durchschnitt TMAX und TMIN pro Jahr für Station
def plot_temp_year(station_name):
df_temp = spark.sql(f"""
SELECT
year(d.date) AS year,
AVG(CASE WHEN d.element = 'TMAX' THEN d.value END) / 10.0 AS avg_tmax,
AVG(CASE WHEN d.element = 'TMIN' THEN d.value END) / 10.0 AS avg_tmin
FROM ghcnddata d
JOIN ghcndstations s
ON d.stationid = s.stationid
WHERE
trim(upper(s.stationname)) = '{station_name.upper()}'
AND d.element IN ('TMAX', 'TMIN')
GROUP BY year(d.date)
ORDER BY year
""").collect()
if not df_temp:
print(f"Keine Temperaturdaten für Station '{station_name}'.")
return
years = [row['year'] for row in df_temp]
avg_tmax = [row['avg_tmax'] for row in df_temp]
avg_tmin = [row['avg_tmin'] for row in df_temp]
plt.plot(years, avg_tmax, 'r', label='TMAX')
plt.plot(years, avg_tmin, 'b', label='TMIN')
plt.xlabel('Jahr')
plt.ylabel('Temperatur (°C)')
plt.title(f'Temperatur {station_name}')
plt.legend()
plt.show()
# h) Durchschnitt TMAX pro Jahr und 20-Jahre Durchschnitt
def plot_tmax_trend(station_name):
df_trend = spark.sql(f"""
SELECT
year(d.date) AS year,
AVG(d.value) / 10.0 AS avg_tmax
FROM ghcnddata d
JOIN ghcndstations s
ON d.stationid = s.stationid
WHERE
trim(upper(s.stationname)) = '{station_name.upper()}'
AND d.element = 'TMAX'
GROUP BY year(d.date)
ORDER BY year
""").collect()
if not df_trend:
print(f"Keine TMAX-Daten für Station '{station_name}'.")
return
years = [row['year'] for row in df_trend]
avg_tmax = [row['avg_tmax'] for row in df_trend]
rolling_avg = []
for i in range(len(avg_tmax)):
start = max(0, i - 9)
end = min(len(avg_tmax), i + 11)
rolling_avg.append(sum(avg_tmax[start:end]) / (end - start))
plt.plot(years, avg_tmax, label='Jährlich')
plt.plot(years, rolling_avg, label='20-Jahre')
plt.xlabel('Jahr')
plt.ylabel('Durchschnitt TMAX (°C)')
plt.title(f'TMAX Trend {station_name}')
plt.legend()
plt.show()
# i) Korrelation TMIN und TMAX pro Jahr
def plot_corr_temp(station_name):
df_corr = spark.sql(f"""
SELECT
year(date) AS year,
corr(tmin_val, tmax_val) AS correlation
FROM (
SELECT
d.date,
MAX(CASE WHEN d.element = 'TMIN' THEN d.value END) AS tmin_val,
MAX(CASE WHEN d.element = 'TMAX' THEN d.value END) AS tmax_val
FROM ghcnddata d
JOIN ghcndstations s
ON d.stationid = s.stationid
WHERE
trim(upper(s.stationname)) = '{station_name.upper()}'
AND d.element IN ('TMIN', 'TMAX')
GROUP BY d.date
)
GROUP BY year(date)
ORDER BY year
""").collect()
if not df_corr:
print(f"Keine Korrelationsdaten für Station '{station_name}'.")
return
years = [row['year'] for row in df_corr]
correlation = [row['correlation'] for row in df_corr]
plt.plot(years, correlation)
plt.xlabel('Jahr')
plt.ylabel('Korrelation TMIN-TMAX')
plt.title(f'Korrelation {station_name}')
plt.show()
def main(scon, spark):
# Daten laden
ghcnd_stations.read_ghcnd_from_parquet(spark)
# a) Liste aller Stationen
print("a)")
get_all_stations()
# b) Anzahl Stationen je Land
print("b)")
get_station_count_per_country()
# c) Stationen in Deutschland
print("c)")
get_german_stations()
# d) Plot für Kempten, Hohenpeissenberg, Zugspitze
print("d)")
plot_temp_day('KEMPTEN', 2024)
plot_temp_day('HOHENPEISSENBERG', 2024)
plot_temp_day('ZUGSPITZE', 2024)
# e) Niederschlag
print("e)")
plot_precip_year('KEMPTEN')
plot_precip_year('HOHENPEISSENBERG')
plot_precip_year('ZUGSPITZE')
# f) Durchschnitt TMAX
print("f)")
plot_avg_tmax_day('KEMPTEN')
plot_avg_tmax_day('HOHENPEISSENBERG')
plot_avg_tmax_day('ZUGSPITZE')
# g) Temperatur pro Jahr
print("g)")
plot_temp_year('KEMPTEN')
plot_temp_year('HOHENPEISSENBERG')
plot_temp_year('ZUGSPITZE')
# h) TMAX Trend
print("h)")
plot_tmax_trend('KEMPTEN')
plot_tmax_trend('HOHENPEISSENBERG')
plot_tmax_trend('ZUGSPITZE')
# i) Korrelation
print("i)")
plot_corr_temp('KEMPTEN')
plot_corr_temp('HOHENPEISSENBERG')
plot_corr_temp('ZUGSPITZE')
if __name__ == "__main__":
main(scon, spark)