This commit is contained in:
2025-11-14 10:01:38 +01:00
6 changed files with 273 additions and 124 deletions

View File

@@ -10,7 +10,7 @@ from pyspark.sql import SparkSession
# connect to cluster # connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin") conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g') conf.set("spark.executor.memory", '16g')
conf.set("spark.driver.memory", '8g') conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40") conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf) scon = SparkContext(conf=conf)

View File

@@ -1,3 +1,5 @@
from sparkstart import scon, spark
def a(scon, spark, path): def a(scon, spark, path):
rdd = scon.textFile(path) rdd = scon.textFile(path)
return rdd return rdd
@@ -49,11 +51,11 @@ def e(scon,spark, path, top_n=20):
return d(rdd, top_n) return d(rdd, top_n)
def main(scon, spark): def main(scon, spark):
"""
main(scon, spark)
"""
rdd = a(scon, spark, "/data/texte/test/robinsonCrusoe.txt") rdd = a(scon, spark, "/data/texte/test/robinsonCrusoe.txt")
b(rdd) b(rdd)
c(rdd) c(rdd)
d(rdd) d(rdd)
e(scon, spark, "/data/texte/test/DonQuijote.txt") e(scon, spark, "/data/texte/test/DonQuijote.txt")
if __name__ == "__main__":
main(scon, spark)

22
Aufgabe 6/sparkstart.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()

View File

@@ -1,5 +1,7 @@
def readAllLanguages(scone, lang_file): from sparkstart import scon, spark
return (scone.textFile(lang_file)
def readAllLanguages(scon, lang_file):
return (scon.textFile(lang_file)
.map(lambda line: line.strip().split(",")) .map(lambda line: line.strip().split(","))
.filter(lambda x: len(x) == 2) .filter(lambda x: len(x) == 2)
.map(lambda x: (x[1].lower(), x[0].lower()))) .map(lambda x: (x[1].lower(), x[0].lower())))
@@ -57,9 +59,6 @@ def count_texts_per_language(lang_detected_rdd):
def detect_languages(scon, text_dir, lang_file, partitions=1000): def detect_languages(scon, text_dir, lang_file, partitions=1000):
"""
detect_languages(scon, "/data/texte/txt", "/data/texte/languages.txt")
"""
import time import time
start_time = time.time() start_time = time.time()
@@ -89,4 +88,12 @@ def detect_languages(scon, text_dir, lang_file, partitions=1000):
print("\nGesamtanzahl Texte pro Sprache:") print("\nGesamtanzahl Texte pro Sprache:")
for lang, count in summary: for lang, count in summary:
print(f"{lang}: {count}") print(f"{lang}: {count}")
def main(scon, spark):
detect_languages(scon,
text_dir="/data/texte/txt",
lang_file="/data/texte/languages.txt",)
if __name__ == "__main__":
main(scon, spark)

22
Aufgabe 7/sparkstart.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()

View File

@@ -1,220 +1,316 @@
from sparkstart import scon, spark from sparkstart import scon, spark
import ghcnd_stations import ghcnd_stations
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import time
# a) Liste aller Stationen sortiert nach Stationsname # a) Liste aller Stationen sortiert nach Stationsname
def get_all_stations(): def get_all_stations():
start = time.time() result = spark.sql("""
result = spark.sql("SELECT * FROM stations ORDER BY name") SELECT *
result.show() FROM ghcndstations
end = time.time() ORDER BY stationname
print(f"Zeit: {end - start}") """)
# Zweite Ausführung result.show(truncate=False)
start = time.time()
result = spark.sql("SELECT * FROM stations ORDER BY name")
result.show()
end = time.time()
print(f"Zeit zweite Ausführung: {end - start}")
# b) Anzahl der Stationen je Land # b) Anzahl der Stationen je Land
def get_station_count_per_country(): def get_station_count_per_country():
start = time.time()
result = spark.sql(""" result = spark.sql("""
SELECT c.country_code, c.name, COUNT(s.id) as count SELECT
FROM stations s c.countrycode,
JOIN ghcndcountries c ON s.country = c.country_code c.countryname,
GROUP BY c.country_code, c.name COUNT(s.stationid) AS count
FROM ghcndstations s
JOIN ghcndcountries c
ON s.countrycode = c.countrycode
GROUP BY
c.countrycode,
c.countryname
ORDER BY count DESC ORDER BY count DESC
""") """)
result.show(truncate=False) result.show(truncate=False)
end = time.time()
print(f"Zeit: {end - start}")
# Zweite
start = time.time()
result = spark.sql("""
SELECT c.country_code, c.name, COUNT(s.id) as count
FROM stations s
JOIN ghcndcountries c ON s.country = c.country_code
GROUP BY c.country_code, c.name
ORDER BY count DESC
""")
result.show(truncate=False)
end = time.time()
print(f"Zeit zweite: {end - start}")
# c) Stationen in Deutschland # c) Stationen in Deutschland
def get_german_stations(): def get_german_stations():
start = time.time() result = spark.sql("""
result = spark.sql("SELECT * FROM stations WHERE country = 'GM' ORDER BY name") SELECT *
result.show() FROM ghcndstations
end = time.time() WHERE countrycode = 'GM'
print(f"Zeit: {end - start}") ORDER BY stationname
# Zweite """)
start = time.time() result.show(truncate=False)
result = spark.sql("SELECT * FROM stations WHERE country = 'GM' ORDER BY name")
result.show()
end = time.time()
print(f"Zeit zweite: {end - start}")
# d) Plot TMAX und TMIN für Station und Jahr # d) Plot TMAX und TMIN für Station und Jahr
def plot_temp_day(station_name, year): def plot_temp_day(station_name, year):
# Station ID finden
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
# Daten filtern
df_filtered = spark.sql(f""" df_filtered = spark.sql(f"""
SELECT date, TMAX, TMIN FROM ghcnd_data SELECT
WHERE station = '{station_id}' AND year(date) = {year} d.date,
ORDER BY date d.value / 10.0 AS temp,
""").toPandas() d.element
# Temperaturen in Grad umrechnen FROM ghcnddata d
df_filtered['TMAX'] /= 10 JOIN ghcndstations s
df_filtered['TMIN'] /= 10 ON d.stationid = s.stationid
# Tage des Jahres WHERE
df_filtered['day_of_year'] = df_filtered['date'].dt.dayofyear trim(upper(s.stationname)) = '{station_name.upper()}'
plt.plot(df_filtered['day_of_year'], df_filtered['TMAX'], 'r', label='TMAX') AND year(d.date) = {year}
plt.plot(df_filtered['day_of_year'], df_filtered['TMIN'], 'b', label='TMIN') AND d.element IN ('TMAX', 'TMIN')
ORDER BY d.date
""").collect()
if not df_filtered:
print(f"Keine Daten für Station '{station_name}' im Jahr {year} (oder Station nicht gefunden).")
return
# Daten in Dicts organisieren
tmax_data = {row['date']: row['temp'] for row in df_filtered if row['element'] == 'TMAX'}
tmin_data = {row['date']: row['temp'] for row in df_filtered if row['element'] == 'TMIN'}
# Sortieren nach Datum
dates = sorted(set(tmax_data.keys()) | set(tmin_data.keys()))
tmax_vals = [tmax_data.get(d, None) for d in dates]
tmin_vals = [tmin_data.get(d, None) for d in dates]
day_of_year = [d.timetuple().tm_yday for d in dates]
plt.plot(day_of_year, tmax_vals, 'r', label='TMAX')
plt.plot(day_of_year, tmin_vals, 'b', label='TMIN')
plt.xlabel('Tag des Jahres') plt.xlabel('Tag des Jahres')
plt.ylabel('Temperatur (°C)') plt.ylabel('Temperatur (°C)')
plt.title(f'{station_name} {year}') plt.title(f'{station_name} {year}')
plt.legend() plt.legend()
plt.show() plt.show()
# e) Gesamt-Niederschlag pro Jahr für Station # e) Gesamt-Niederschlag pro Jahr für Station
def plot_precip_year(station_name): def plot_precip_year(station_name):
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
df_precip = spark.sql(f""" df_precip = spark.sql(f"""
SELECT year(date) as year, SUM(PRCP)/10 as total_precip SELECT
FROM ghcnd_data year(d.date) AS year,
WHERE station = '{station_id}' SUM(d.value) / 10.0 AS total_precip
GROUP BY year(date) FROM ghcnddata d
JOIN ghcndstations s
ON d.stationid = s.stationid
WHERE
trim(upper(s.stationname)) = '{station_name.upper()}'
AND d.element = 'PRCP'
GROUP BY year(d.date)
ORDER BY year ORDER BY year
""").toPandas() """).collect()
plt.bar(df_precip['year'], df_precip['total_precip'])
if not df_precip:
print(f"Keine Niederschlagsdaten für Station '{station_name}'.")
return
years = [row['year'] for row in df_precip]
total_precip = [row['total_precip'] for row in df_precip]
plt.bar(years, total_precip)
plt.xlabel('Jahr') plt.xlabel('Jahr')
plt.ylabel('Niederschlag (mm)') plt.ylabel('Niederschlag (mm)')
plt.title(f'Gesamt-Niederschlag {station_name}') plt.title(f'Gesamt-Niederschlag {station_name}')
plt.show() plt.show()
# f) Durchschnitt TMAX pro Tag des Jahres, mit 21-Tage Durchschnitt # f) Durchschnitt TMAX pro Tag des Jahres, mit 21-Tage Durchschnitt
def plot_avg_tmax_day(station_name): def plot_avg_tmax_day(station_name):
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
df_avg = spark.sql(f""" df_avg = spark.sql(f"""
SELECT dayofyear(date) as day, AVG(TMAX)/10 as avg_tmax SELECT
FROM ghcnd_data dayofyear(d.date) AS day,
WHERE station = '{station_id}' AVG(d.value) / 10.0 AS avg_tmax
GROUP BY dayofyear(date) FROM ghcnddata d
JOIN ghcndstations s
ON d.stationid = s.stationid
WHERE
trim(upper(s.stationname)) = '{station_name.upper()}'
AND d.element = 'TMAX'
GROUP BY dayofyear(d.date)
ORDER BY day ORDER BY day
""").toPandas() """).collect()
# 21-Tage Durchschnitt
df_avg['rolling_avg'] = df_avg['avg_tmax'].rolling(21, center=True).mean() if not df_avg:
plt.plot(df_avg['day'], df_avg['avg_tmax'], label='Täglich') print(f"Keine TMAX-Daten für Station '{station_name}'.")
plt.plot(df_avg['day'], df_avg['rolling_avg'], label='21-Tage') return
days = [row['day'] for row in df_avg]
avg_tmax = [row['avg_tmax'] for row in df_avg]
# 21-Tage gleitender Durchschnitt (10 Tage davor, Tag selbst, 10 Tage danach)
rolling_avg = []
for i in range(len(avg_tmax)):
start = max(0, i - 10)
end = min(len(avg_tmax), i + 11)
rolling_avg.append(sum(avg_tmax[start:end]) / (end - start))
plt.plot(days, avg_tmax, label='Täglich')
plt.plot(days, rolling_avg, label='21-Tage')
plt.xlabel('Tag des Jahres') plt.xlabel('Tag des Jahres')
plt.ylabel('Durchschnitt TMAX (°C)') plt.ylabel('Durchschnitt TMAX (°C)')
plt.title(f'Durchschnitt TMAX {station_name}') plt.title(f'Durchschnitt TMAX {station_name}')
plt.legend() plt.legend()
plt.show() plt.show()
# g) Durchschnitt TMAX und TMIN pro Jahr für Station # g) Durchschnitt TMAX und TMIN pro Jahr für Station
def plot_temp_year(station_name): def plot_temp_year(station_name):
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
df_temp = spark.sql(f""" df_temp = spark.sql(f"""
SELECT year(date) as year, AVG(TMAX)/10 as avg_tmax, AVG(TMIN)/10 as avg_tmin SELECT
FROM ghcnd_data year(d.date) AS year,
WHERE station = '{station_id}' AVG(CASE WHEN d.element = 'TMAX' THEN d.value END) / 10.0 AS avg_tmax,
GROUP BY year(date) AVG(CASE WHEN d.element = 'TMIN' THEN d.value END) / 10.0 AS avg_tmin
FROM ghcnddata d
JOIN ghcndstations s
ON d.stationid = s.stationid
WHERE
trim(upper(s.stationname)) = '{station_name.upper()}'
AND d.element IN ('TMAX', 'TMIN')
GROUP BY year(d.date)
ORDER BY year ORDER BY year
""").toPandas() """).collect()
plt.plot(df_temp['year'], df_temp['avg_tmax'], 'r', label='TMAX')
plt.plot(df_temp['year'], df_temp['avg_tmin'], 'b', label='TMIN') if not df_temp:
print(f"Keine Temperaturdaten für Station '{station_name}'.")
return
years = [row['year'] for row in df_temp]
avg_tmax = [row['avg_tmax'] for row in df_temp]
avg_tmin = [row['avg_tmin'] for row in df_temp]
plt.plot(years, avg_tmax, 'r', label='TMAX')
plt.plot(years, avg_tmin, 'b', label='TMIN')
plt.xlabel('Jahr') plt.xlabel('Jahr')
plt.ylabel('Temperatur (°C)') plt.ylabel('Temperatur (°C)')
plt.title(f'Temperatur {station_name}') plt.title(f'Temperatur {station_name}')
plt.legend() plt.legend()
plt.show() plt.show()
# h) Durchschnitt TMAX pro Jahr und 20-Jahre Durchschnitt # h) Durchschnitt TMAX pro Jahr und 20-Jahre Durchschnitt
def plot_tmax_trend(station_name): def plot_tmax_trend(station_name):
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
df_trend = spark.sql(f""" df_trend = spark.sql(f"""
SELECT year(date) as year, AVG(TMAX)/10 as avg_tmax SELECT
FROM ghcnd_data year(d.date) AS year,
WHERE station = '{station_id}' AVG(d.value) / 10.0 AS avg_tmax
GROUP BY year(date) FROM ghcnddata d
JOIN ghcndstations s
ON d.stationid = s.stationid
WHERE
trim(upper(s.stationname)) = '{station_name.upper()}'
AND d.element = 'TMAX'
GROUP BY year(d.date)
ORDER BY year ORDER BY year
""").toPandas() """).collect()
# 20-Jahre Durchschnitt
df_trend['rolling_avg'] = df_trend['avg_tmax'].rolling(20, center=True).mean() if not df_trend:
plt.plot(df_trend['year'], df_trend['avg_tmax'], label='Jährlich') print(f"Keine TMAX-Daten für Station '{station_name}'.")
plt.plot(df_trend['year'], df_trend['rolling_avg'], label='20-Jahre') return
years = [row['year'] for row in df_trend]
avg_tmax = [row['avg_tmax'] for row in df_trend]
rolling_avg = []
for i in range(len(avg_tmax)):
start = max(0, i - 9)
end = min(len(avg_tmax), i + 11)
rolling_avg.append(sum(avg_tmax[start:end]) / (end - start))
plt.plot(years, avg_tmax, label='Jährlich')
plt.plot(years, rolling_avg, label='20-Jahre')
plt.xlabel('Jahr') plt.xlabel('Jahr')
plt.ylabel('Durchschnitt TMAX (°C)') plt.ylabel('Durchschnitt TMAX (°C)')
plt.title(f'TMAX Trend {station_name}') plt.title(f'TMAX Trend {station_name}')
plt.legend() plt.legend()
plt.show() plt.show()
# i) Korrelation TMIN und TMAX pro Jahr # i) Korrelation TMIN und TMAX pro Jahr
def plot_corr_temp(station_name): def plot_corr_temp(station_name):
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
df_corr = spark.sql(f""" df_corr = spark.sql(f"""
SELECT year(date) as year, corr(TMIN, TMAX) as correlation SELECT
year(date) AS year,
corr(tmin_val, tmax_val) AS correlation
FROM ( FROM (
SELECT date, TMIN, TMAX SELECT
FROM ghcnd_data d.date,
WHERE station = '{station_id}' AND TMIN IS NOT NULL AND TMAX IS NOT NULL MAX(CASE WHEN d.element = 'TMIN' THEN d.value END) AS tmin_val,
) MAX(CASE WHEN d.element = 'TMAX' THEN d.value END) AS tmax_val
GROUP BY year(date) FROM ghcnddata d
JOIN ghcndstations s
ON d.stationid = s.stationid
WHERE
trim(upper(s.stationname)) = '{station_name.upper()}'
AND d.element IN ('TMIN', 'TMAX')
GROUP BY d.date
)
GROUP BY year(date)
ORDER BY year ORDER BY year
""").toPandas() """).collect()
plt.plot(df_corr['year'], df_corr['correlation'])
if not df_corr:
print(f"Keine Korrelationsdaten für Station '{station_name}'.")
return
years = [row['year'] for row in df_corr]
correlation = [row['correlation'] for row in df_corr]
plt.plot(years, correlation)
plt.xlabel('Jahr') plt.xlabel('Jahr')
plt.ylabel('Korrelation TMIN-TMAX') plt.ylabel('Korrelation TMIN-TMAX')
plt.title(f'Korrelation {station_name}') plt.title(f'Korrelation {station_name}')
plt.show() plt.show()
def main(scon, spark): def main(scon, spark):
# Daten laden # Daten laden
ghcnd_stations.read_ghcnd_from_parquet(spark) ghcnd_stations.read_ghcnd_from_parquet(spark)
# a) Liste aller Stationen # a) Liste aller Stationen
print("a)")
get_all_stations() get_all_stations()
# b) Anzahl Stationen je Land # b) Anzahl Stationen je Land
print("b)")
get_station_count_per_country() get_station_count_per_country()
# c) Stationen in Deutschland # c) Stationen in Deutschland
print("c)")
get_german_stations() get_german_stations()
# d) Plot für Kempten, Hohenpeissenberg, Zugspitze # d) Plot für Kempten, Hohenpeissenberg, Zugspitze
plot_temp_day('KEMPTEN', 2020) print("d)")
plot_temp_day('HOHENPEISSENBERG', 2020) plot_temp_day('KEMPTEN', 2024)
plot_temp_day('ZUGSPITZE', 2020) plot_temp_day('HOHENPEISSENBERG', 2024)
plot_temp_day('ZUGSPITZE', 2024)
# e) Niederschlag # e) Niederschlag
print("e)")
plot_precip_year('KEMPTEN') plot_precip_year('KEMPTEN')
plot_precip_year('HOHENPEISSENBERG') plot_precip_year('HOHENPEISSENBERG')
plot_precip_year('ZUGSPITZE') plot_precip_year('ZUGSPITZE')
# f) Durchschnitt TMAX # f) Durchschnitt TMAX
print("f)")
plot_avg_tmax_day('KEMPTEN') plot_avg_tmax_day('KEMPTEN')
plot_avg_tmax_day('HOHENPEISSENBERG') plot_avg_tmax_day('HOHENPEISSENBERG')
plot_avg_tmax_day('ZUGSPITZE') plot_avg_tmax_day('ZUGSPITZE')
# g) Temperatur pro Jahr # g) Temperatur pro Jahr
print("g)")
plot_temp_year('KEMPTEN') plot_temp_year('KEMPTEN')
plot_temp_year('HOHENPEISSENBERG') plot_temp_year('HOHENPEISSENBERG')
plot_temp_year('ZUGSPITZE') plot_temp_year('ZUGSPITZE')
# h) TMAX Trend # h) TMAX Trend
print("h)")
plot_tmax_trend('KEMPTEN') plot_tmax_trend('KEMPTEN')
plot_tmax_trend('HOHENPEISSENBERG') plot_tmax_trend('HOHENPEISSENBERG')
plot_tmax_trend('ZUGSPITZE') plot_tmax_trend('ZUGSPITZE')
# i) Korrelation # i) Korrelation
print("i)")
plot_corr_temp('KEMPTEN') plot_corr_temp('KEMPTEN')
plot_corr_temp('HOHENPEISSENBERG') plot_corr_temp('HOHENPEISSENBERG')
plot_corr_temp('ZUGSPITZE') plot_corr_temp('ZUGSPITZE')
if __name__ == "__main__": if __name__ == "__main__":
main(scon, spark) main(scon, spark)