mirror of
https://github.com/Vale54321/BigData.git
synced 2025-12-11 09:59:33 +01:00
317 lines
9.0 KiB
Python
317 lines
9.0 KiB
Python
from sparkstart import scon, spark
|
|
import ghcnd_stations
|
|
import matplotlib.pyplot as plt
|
|
|
|
# a) Liste aller Stationen sortiert nach Stationsname
|
|
def get_all_stations():
|
|
result = spark.sql("""
|
|
SELECT *
|
|
FROM ghcndstations
|
|
ORDER BY stationname
|
|
""")
|
|
result.show(truncate=False)
|
|
|
|
|
|
# b) Anzahl der Stationen je Land
|
|
def get_station_count_per_country():
|
|
result = spark.sql("""
|
|
SELECT
|
|
c.countrycode,
|
|
c.countryname,
|
|
COUNT(s.stationid) AS count
|
|
FROM ghcndstations s
|
|
JOIN ghcndcountries c
|
|
ON s.countrycode = c.countrycode
|
|
GROUP BY
|
|
c.countrycode,
|
|
c.countryname
|
|
ORDER BY count DESC
|
|
""")
|
|
result.show(truncate=False)
|
|
|
|
|
|
# c) Stationen in Deutschland
|
|
def get_german_stations():
|
|
result = spark.sql("""
|
|
SELECT *
|
|
FROM ghcndstations
|
|
WHERE countrycode = 'GM'
|
|
ORDER BY stationname
|
|
""")
|
|
result.show(truncate=False)
|
|
|
|
|
|
# d) Plot TMAX und TMIN für Station und Jahr
|
|
def plot_temp_day(station_name, year):
|
|
df_filtered = spark.sql(f"""
|
|
SELECT
|
|
d.date,
|
|
d.value / 10.0 AS temp,
|
|
d.element
|
|
FROM ghcnddata d
|
|
JOIN ghcndstations s
|
|
ON d.stationid = s.stationid
|
|
WHERE
|
|
trim(upper(s.stationname)) = '{station_name.upper()}'
|
|
AND year(d.date) = {year}
|
|
AND d.element IN ('TMAX', 'TMIN')
|
|
ORDER BY d.date
|
|
""").collect()
|
|
|
|
if not df_filtered:
|
|
print(f"Keine Daten für Station '{station_name}' im Jahr {year} (oder Station nicht gefunden).")
|
|
return
|
|
|
|
# Daten in Dicts organisieren
|
|
tmax_data = {row['date']: row['temp'] for row in df_filtered if row['element'] == 'TMAX'}
|
|
tmin_data = {row['date']: row['temp'] for row in df_filtered if row['element'] == 'TMIN'}
|
|
|
|
# Sortieren nach Datum
|
|
dates = sorted(set(tmax_data.keys()) | set(tmin_data.keys()))
|
|
tmax_vals = [tmax_data.get(d, None) for d in dates]
|
|
tmin_vals = [tmin_data.get(d, None) for d in dates]
|
|
day_of_year = [d.timetuple().tm_yday for d in dates]
|
|
|
|
plt.plot(day_of_year, tmax_vals, 'r', label='TMAX')
|
|
plt.plot(day_of_year, tmin_vals, 'b', label='TMIN')
|
|
plt.xlabel('Tag des Jahres')
|
|
plt.ylabel('Temperatur (°C)')
|
|
plt.title(f'{station_name} {year}')
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
|
|
# e) Gesamt-Niederschlag pro Jahr für Station
|
|
def plot_precip_year(station_name):
|
|
df_precip = spark.sql(f"""
|
|
SELECT
|
|
year(d.date) AS year,
|
|
SUM(d.value) / 10.0 AS total_precip
|
|
FROM ghcnddata d
|
|
JOIN ghcndstations s
|
|
ON d.stationid = s.stationid
|
|
WHERE
|
|
trim(upper(s.stationname)) = '{station_name.upper()}'
|
|
AND d.element = 'PRCP'
|
|
GROUP BY year(d.date)
|
|
ORDER BY year
|
|
""").collect()
|
|
|
|
if not df_precip:
|
|
print(f"Keine Niederschlagsdaten für Station '{station_name}'.")
|
|
return
|
|
|
|
years = [row['year'] for row in df_precip]
|
|
total_precip = [row['total_precip'] for row in df_precip]
|
|
|
|
plt.bar(years, total_precip)
|
|
plt.xlabel('Jahr')
|
|
plt.ylabel('Niederschlag (mm)')
|
|
plt.title(f'Gesamt-Niederschlag {station_name}')
|
|
plt.show()
|
|
|
|
|
|
# f) Durchschnitt TMAX pro Tag des Jahres, mit 21-Tage Durchschnitt
|
|
def plot_avg_tmax_day(station_name):
|
|
df_avg = spark.sql(f"""
|
|
SELECT
|
|
dayofyear(d.date) AS day,
|
|
AVG(d.value) / 10.0 AS avg_tmax
|
|
FROM ghcnddata d
|
|
JOIN ghcndstations s
|
|
ON d.stationid = s.stationid
|
|
WHERE
|
|
trim(upper(s.stationname)) = '{station_name.upper()}'
|
|
AND d.element = 'TMAX'
|
|
GROUP BY dayofyear(d.date)
|
|
ORDER BY day
|
|
""").collect()
|
|
|
|
if not df_avg:
|
|
print(f"Keine TMAX-Daten für Station '{station_name}'.")
|
|
return
|
|
|
|
days = [row['day'] for row in df_avg]
|
|
avg_tmax = [row['avg_tmax'] for row in df_avg]
|
|
#TODO: Mit SQL machen
|
|
# 21-Tage gleitender Durchschnitt (10 Tage davor, Tag selbst, 10 Tage danach)
|
|
rolling_avg = []
|
|
for i in range(len(avg_tmax)):
|
|
start = max(0, i - 10)
|
|
end = min(len(avg_tmax), i + 11)
|
|
rolling_avg.append(sum(avg_tmax[start:end]) / (end - start))
|
|
|
|
plt.plot(days, avg_tmax, label='Täglich')
|
|
plt.plot(days, rolling_avg, label='21-Tage')
|
|
plt.xlabel('Tag des Jahres')
|
|
plt.ylabel('Durchschnitt TMAX (°C)')
|
|
plt.title(f'Durchschnitt TMAX {station_name}')
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
|
|
# g) Durchschnitt TMAX und TMIN pro Jahr für Station
|
|
def plot_temp_year(station_name):
|
|
df_temp = spark.sql(f"""
|
|
SELECT
|
|
year(d.date) AS year,
|
|
AVG(CASE WHEN d.element = 'TMAX' THEN d.value END) / 10.0 AS avg_tmax,
|
|
AVG(CASE WHEN d.element = 'TMIN' THEN d.value END) / 10.0 AS avg_tmin
|
|
FROM ghcnddata d
|
|
JOIN ghcndstations s
|
|
ON d.stationid = s.stationid
|
|
WHERE
|
|
trim(upper(s.stationname)) = '{station_name.upper()}'
|
|
AND d.element IN ('TMAX', 'TMIN')
|
|
GROUP BY year(d.date)
|
|
ORDER BY year
|
|
""").collect()
|
|
|
|
if not df_temp:
|
|
print(f"Keine Temperaturdaten für Station '{station_name}'.")
|
|
return
|
|
|
|
years = [row['year'] for row in df_temp]
|
|
avg_tmax = [row['avg_tmax'] for row in df_temp]
|
|
avg_tmin = [row['avg_tmin'] for row in df_temp]
|
|
|
|
plt.plot(years, avg_tmax, 'r', label='TMAX')
|
|
plt.plot(years, avg_tmin, 'b', label='TMIN')
|
|
plt.xlabel('Jahr')
|
|
plt.ylabel('Temperatur (°C)')
|
|
plt.title(f'Temperatur {station_name}')
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
|
|
# h) Durchschnitt TMAX pro Jahr und 20-Jahre Durchschnitt
|
|
def plot_tmax_trend(station_name):
|
|
df_trend = spark.sql(f"""
|
|
SELECT
|
|
year(d.date) AS year,
|
|
AVG(d.value) / 10.0 AS avg_tmax
|
|
FROM ghcnddata d
|
|
JOIN ghcndstations s
|
|
ON d.stationid = s.stationid
|
|
WHERE
|
|
trim(upper(s.stationname)) = '{station_name.upper()}'
|
|
AND d.element = 'TMAX'
|
|
GROUP BY year(d.date)
|
|
ORDER BY year
|
|
""").collect()
|
|
|
|
if not df_trend:
|
|
print(f"Keine TMAX-Daten für Station '{station_name}'.")
|
|
return
|
|
|
|
years = [row['year'] for row in df_trend]
|
|
avg_tmax = [row['avg_tmax'] for row in df_trend]
|
|
|
|
rolling_avg = []
|
|
for i in range(len(avg_tmax)):
|
|
start = max(0, i - 9)
|
|
end = min(len(avg_tmax), i + 11)
|
|
rolling_avg.append(sum(avg_tmax[start:end]) / (end - start))
|
|
|
|
plt.plot(years, avg_tmax, label='Jährlich')
|
|
plt.plot(years, rolling_avg, label='20-Jahre')
|
|
plt.xlabel('Jahr')
|
|
plt.ylabel('Durchschnitt TMAX (°C)')
|
|
plt.title(f'TMAX Trend {station_name}')
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
|
|
# i) Korrelation TMIN und TMAX pro Jahr
|
|
def plot_corr_temp(station_name):
|
|
df_corr = spark.sql(f"""
|
|
SELECT
|
|
year(date) AS year,
|
|
corr(tmin_val, tmax_val) AS correlation
|
|
FROM (
|
|
SELECT
|
|
d.date,
|
|
MAX(CASE WHEN d.element = 'TMIN' THEN d.value END) AS tmin_val,
|
|
MAX(CASE WHEN d.element = 'TMAX' THEN d.value END) AS tmax_val
|
|
FROM ghcnddata d
|
|
JOIN ghcndstations s
|
|
ON d.stationid = s.stationid
|
|
WHERE
|
|
trim(upper(s.stationname)) = '{station_name.upper()}'
|
|
AND d.element IN ('TMIN', 'TMAX')
|
|
GROUP BY d.date
|
|
)
|
|
GROUP BY year(date)
|
|
ORDER BY year
|
|
""").collect()
|
|
|
|
if not df_corr:
|
|
print(f"Keine Korrelationsdaten für Station '{station_name}'.")
|
|
return
|
|
|
|
years = [row['year'] for row in df_corr]
|
|
correlation = [row['correlation'] for row in df_corr]
|
|
|
|
plt.plot(years, correlation)
|
|
plt.xlabel('Jahr')
|
|
plt.ylabel('Korrelation TMIN-TMAX')
|
|
plt.title(f'Korrelation {station_name}')
|
|
plt.show()
|
|
|
|
|
|
def main(scon, spark):
|
|
# Daten laden
|
|
ghcnd_stations.read_ghcnd_from_parquet(spark)
|
|
|
|
# a) Liste aller Stationen
|
|
print("a)")
|
|
get_all_stations()
|
|
|
|
# b) Anzahl Stationen je Land
|
|
print("b)")
|
|
get_station_count_per_country()
|
|
|
|
# c) Stationen in Deutschland
|
|
print("c)")
|
|
get_german_stations()
|
|
|
|
# d) Plot für Kempten, Hohenpeissenberg, Zugspitze
|
|
print("d)")
|
|
plot_temp_day('KEMPTEN', 2024)
|
|
plot_temp_day('HOHENPEISSENBERG', 2024)
|
|
plot_temp_day('ZUGSPITZE', 2024)
|
|
|
|
# e) Niederschlag
|
|
print("e)")
|
|
plot_precip_year('KEMPTEN')
|
|
plot_precip_year('HOHENPEISSENBERG')
|
|
plot_precip_year('ZUGSPITZE')
|
|
|
|
# f) Durchschnitt TMAX
|
|
print("f)")
|
|
plot_avg_tmax_day('KEMPTEN')
|
|
plot_avg_tmax_day('HOHENPEISSENBERG')
|
|
plot_avg_tmax_day('ZUGSPITZE')
|
|
|
|
# g) Temperatur pro Jahr
|
|
print("g)")
|
|
plot_temp_year('KEMPTEN')
|
|
plot_temp_year('HOHENPEISSENBERG')
|
|
plot_temp_year('ZUGSPITZE')
|
|
|
|
# h) TMAX Trend
|
|
print("h)")
|
|
plot_tmax_trend('KEMPTEN')
|
|
plot_tmax_trend('HOHENPEISSENBERG')
|
|
plot_tmax_trend('ZUGSPITZE')
|
|
|
|
# i) Korrelation
|
|
print("i)")
|
|
plot_corr_temp('KEMPTEN')
|
|
plot_corr_temp('HOHENPEISSENBERG')
|
|
plot_corr_temp('ZUGSPITZE')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(scon, spark)
|