diff --git a/Aufgabe 5/sparkstart.py b/Aufgabe 5/sparkstart.py index bdb7010..6166989 100644 --- a/Aufgabe 5/sparkstart.py +++ b/Aufgabe 5/sparkstart.py @@ -10,7 +10,7 @@ from pyspark.sql import SparkSession # connect to cluster conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") -conf.set("spark.executor.memory", '32g') +conf.set("spark.executor.memory", '16g') conf.set("spark.driver.memory", '8g') conf.set("spark.cores.max", "40") scon = SparkContext(conf=conf) diff --git a/Aufgabe 6/main.py b/Aufgabe 6/main.py index b6ecd77..fa7bc0c 100644 --- a/Aufgabe 6/main.py +++ b/Aufgabe 6/main.py @@ -1,3 +1,5 @@ +from sparkstart import scon, spark + def a(scon, spark, path): rdd = scon.textFile(path) return rdd @@ -49,11 +51,11 @@ def e(scon,spark, path, top_n=20): return d(rdd, top_n) def main(scon, spark): - """ - main(scon, spark) - """ rdd = a(scon, spark, "/data/texte/test/robinsonCrusoe.txt") b(rdd) c(rdd) d(rdd) - e(scon, spark, "/data/texte/test/DonQuijote.txt") \ No newline at end of file + e(scon, spark, "/data/texte/test/DonQuijote.txt") + +if __name__ == "__main__": + main(scon, spark) \ No newline at end of file diff --git a/Aufgabe 6/sparkstart.py b/Aufgabe 6/sparkstart.py new file mode 100644 index 0000000..bdb7010 --- /dev/null +++ b/Aufgabe 6/sparkstart.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +""" +Erzeugen einer Spark-Konfiguration +""" + +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession + +# connect to cluster +conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin") +conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +conf.set("spark.executor.memory", '32g') +conf.set("spark.driver.memory", '8g') +conf.set("spark.cores.max", "40") +scon = SparkContext(conf=conf) + + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL") \ + .getOrCreate() diff --git a/Aufgabe 7/main.py b/Aufgabe 7/main.py index 03a02e9..d61fd61 100644 --- a/Aufgabe 7/main.py +++ b/Aufgabe 7/main.py @@ -1,5 +1,7 @@ -def readAllLanguages(scone, lang_file): - return (scone.textFile(lang_file) +from sparkstart import scon, spark + +def readAllLanguages(scon, lang_file): + return (scon.textFile(lang_file) .map(lambda line: line.strip().split(",")) .filter(lambda x: len(x) == 2) .map(lambda x: (x[1].lower(), x[0].lower()))) @@ -57,9 +59,6 @@ def count_texts_per_language(lang_detected_rdd): def detect_languages(scon, text_dir, lang_file, partitions=1000): - """ - detect_languages(scon, "/data/texte/txt", "/data/texte/languages.txt") - """ import time start_time = time.time() @@ -89,4 +88,12 @@ def detect_languages(scon, text_dir, lang_file, partitions=1000): print("\nGesamtanzahl Texte pro Sprache:") for lang, count in summary: - print(f"{lang}: {count}") \ No newline at end of file + print(f"{lang}: {count}") + +def main(scon, spark): + detect_languages(scon, + text_dir="/data/texte/txt", + lang_file="/data/texte/languages.txt",) + +if __name__ == "__main__": + main(scon, spark) \ No newline at end of file diff --git a/Aufgabe 7/sparkstart.py b/Aufgabe 7/sparkstart.py new file mode 100644 index 0000000..bdb7010 --- /dev/null +++ b/Aufgabe 7/sparkstart.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +""" +Erzeugen einer Spark-Konfiguration +""" + +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession + +# connect to cluster +conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin") +conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +conf.set("spark.executor.memory", '32g') +conf.set("spark.driver.memory", '8g') +conf.set("spark.cores.max", "40") +scon = SparkContext(conf=conf) + + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL") \ + .getOrCreate() diff --git a/Aufgabe 8/main.py b/Aufgabe 8/main.py index 2cdc85d..360cbb7 100644 --- a/Aufgabe 8/main.py +++ b/Aufgabe 8/main.py @@ -1,220 +1,316 @@ from sparkstart import scon, spark import ghcnd_stations import matplotlib.pyplot as plt -import time # a) Liste aller Stationen sortiert nach Stationsname def get_all_stations(): - start = time.time() - result = spark.sql("SELECT * FROM stations ORDER BY name") - result.show() - end = time.time() - print(f"Zeit: {end - start}") - # Zweite Ausführung - start = time.time() - result = spark.sql("SELECT * FROM stations ORDER BY name") - result.show() - end = time.time() - print(f"Zeit zweite Ausführung: {end - start}") + result = spark.sql(""" + SELECT * + FROM ghcndstations + ORDER BY stationname + """) + result.show(truncate=False) + # b) Anzahl der Stationen je Land def get_station_count_per_country(): - start = time.time() result = spark.sql(""" - SELECT c.country_code, c.name, COUNT(s.id) as count - FROM stations s - JOIN ghcndcountries c ON s.country = c.country_code - GROUP BY c.country_code, c.name + SELECT + c.countrycode, + c.countryname, + COUNT(s.stationid) AS count + FROM ghcndstations s + JOIN ghcndcountries c + ON s.countrycode = c.countrycode + GROUP BY + c.countrycode, + c.countryname ORDER BY count DESC """) result.show(truncate=False) - end = time.time() - print(f"Zeit: {end - start}") - # Zweite - start = time.time() - result = spark.sql(""" - SELECT c.country_code, c.name, COUNT(s.id) as count - FROM stations s - JOIN ghcndcountries c ON s.country = c.country_code - GROUP BY c.country_code, c.name - ORDER BY count DESC - """) - result.show(truncate=False) - end = time.time() - print(f"Zeit zweite: {end - start}") + # c) Stationen in Deutschland def get_german_stations(): - start = time.time() - result = spark.sql("SELECT * FROM stations WHERE country = 'GM' ORDER BY name") - result.show() - end = time.time() - print(f"Zeit: {end - start}") - # Zweite - start = time.time() - result = spark.sql("SELECT * FROM stations WHERE country = 'GM' ORDER BY name") - result.show() - end = time.time() - print(f"Zeit zweite: {end - start}") + result = spark.sql(""" + SELECT * + FROM ghcndstations + WHERE countrycode = 'GM' + ORDER BY stationname + """) + result.show(truncate=False) + # d) Plot TMAX und TMIN für Station und Jahr def plot_temp_day(station_name, year): - # Station ID finden - station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] - # Daten filtern df_filtered = spark.sql(f""" - SELECT date, TMAX, TMIN FROM ghcnd_data - WHERE station = '{station_id}' AND year(date) = {year} - ORDER BY date - """).toPandas() - # Temperaturen in Grad umrechnen - df_filtered['TMAX'] /= 10 - df_filtered['TMIN'] /= 10 - # Tage des Jahres - df_filtered['day_of_year'] = df_filtered['date'].dt.dayofyear - plt.plot(df_filtered['day_of_year'], df_filtered['TMAX'], 'r', label='TMAX') - plt.plot(df_filtered['day_of_year'], df_filtered['TMIN'], 'b', label='TMIN') + SELECT + d.date, + d.value / 10.0 AS temp, + d.element + FROM ghcnddata d + JOIN ghcndstations s + ON d.stationid = s.stationid + WHERE + trim(upper(s.stationname)) = '{station_name.upper()}' + AND year(d.date) = {year} + AND d.element IN ('TMAX', 'TMIN') + ORDER BY d.date + """).collect() + + if not df_filtered: + print(f"Keine Daten für Station '{station_name}' im Jahr {year} (oder Station nicht gefunden).") + return + + # Daten in Dicts organisieren + tmax_data = {row['date']: row['temp'] for row in df_filtered if row['element'] == 'TMAX'} + tmin_data = {row['date']: row['temp'] for row in df_filtered if row['element'] == 'TMIN'} + + # Sortieren nach Datum + dates = sorted(set(tmax_data.keys()) | set(tmin_data.keys())) + tmax_vals = [tmax_data.get(d, None) for d in dates] + tmin_vals = [tmin_data.get(d, None) for d in dates] + day_of_year = [d.timetuple().tm_yday for d in dates] + + plt.plot(day_of_year, tmax_vals, 'r', label='TMAX') + plt.plot(day_of_year, tmin_vals, 'b', label='TMIN') plt.xlabel('Tag des Jahres') plt.ylabel('Temperatur (°C)') plt.title(f'{station_name} {year}') plt.legend() plt.show() + # e) Gesamt-Niederschlag pro Jahr für Station def plot_precip_year(station_name): - station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] df_precip = spark.sql(f""" - SELECT year(date) as year, SUM(PRCP)/10 as total_precip - FROM ghcnd_data - WHERE station = '{station_id}' - GROUP BY year(date) + SELECT + year(d.date) AS year, + SUM(d.value) / 10.0 AS total_precip + FROM ghcnddata d + JOIN ghcndstations s + ON d.stationid = s.stationid + WHERE + trim(upper(s.stationname)) = '{station_name.upper()}' + AND d.element = 'PRCP' + GROUP BY year(d.date) ORDER BY year - """).toPandas() - plt.bar(df_precip['year'], df_precip['total_precip']) + """).collect() + + if not df_precip: + print(f"Keine Niederschlagsdaten für Station '{station_name}'.") + return + + years = [row['year'] for row in df_precip] + total_precip = [row['total_precip'] for row in df_precip] + + plt.bar(years, total_precip) plt.xlabel('Jahr') plt.ylabel('Niederschlag (mm)') plt.title(f'Gesamt-Niederschlag {station_name}') plt.show() + # f) Durchschnitt TMAX pro Tag des Jahres, mit 21-Tage Durchschnitt def plot_avg_tmax_day(station_name): - station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] df_avg = spark.sql(f""" - SELECT dayofyear(date) as day, AVG(TMAX)/10 as avg_tmax - FROM ghcnd_data - WHERE station = '{station_id}' - GROUP BY dayofyear(date) + SELECT + dayofyear(d.date) AS day, + AVG(d.value) / 10.0 AS avg_tmax + FROM ghcnddata d + JOIN ghcndstations s + ON d.stationid = s.stationid + WHERE + trim(upper(s.stationname)) = '{station_name.upper()}' + AND d.element = 'TMAX' + GROUP BY dayofyear(d.date) ORDER BY day - """).toPandas() - # 21-Tage Durchschnitt - df_avg['rolling_avg'] = df_avg['avg_tmax'].rolling(21, center=True).mean() - plt.plot(df_avg['day'], df_avg['avg_tmax'], label='Täglich') - plt.plot(df_avg['day'], df_avg['rolling_avg'], label='21-Tage') + """).collect() + + if not df_avg: + print(f"Keine TMAX-Daten für Station '{station_name}'.") + return + + days = [row['day'] for row in df_avg] + avg_tmax = [row['avg_tmax'] for row in df_avg] + + # 21-Tage gleitender Durchschnitt (10 Tage davor, Tag selbst, 10 Tage danach) + rolling_avg = [] + for i in range(len(avg_tmax)): + start = max(0, i - 10) + end = min(len(avg_tmax), i + 11) + rolling_avg.append(sum(avg_tmax[start:end]) / (end - start)) + + plt.plot(days, avg_tmax, label='Täglich') + plt.plot(days, rolling_avg, label='21-Tage') plt.xlabel('Tag des Jahres') plt.ylabel('Durchschnitt TMAX (°C)') plt.title(f'Durchschnitt TMAX {station_name}') plt.legend() plt.show() + # g) Durchschnitt TMAX und TMIN pro Jahr für Station def plot_temp_year(station_name): - station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] df_temp = spark.sql(f""" - SELECT year(date) as year, AVG(TMAX)/10 as avg_tmax, AVG(TMIN)/10 as avg_tmin - FROM ghcnd_data - WHERE station = '{station_id}' - GROUP BY year(date) + SELECT + year(d.date) AS year, + AVG(CASE WHEN d.element = 'TMAX' THEN d.value END) / 10.0 AS avg_tmax, + AVG(CASE WHEN d.element = 'TMIN' THEN d.value END) / 10.0 AS avg_tmin + FROM ghcnddata d + JOIN ghcndstations s + ON d.stationid = s.stationid + WHERE + trim(upper(s.stationname)) = '{station_name.upper()}' + AND d.element IN ('TMAX', 'TMIN') + GROUP BY year(d.date) ORDER BY year - """).toPandas() - plt.plot(df_temp['year'], df_temp['avg_tmax'], 'r', label='TMAX') - plt.plot(df_temp['year'], df_temp['avg_tmin'], 'b', label='TMIN') + """).collect() + + if not df_temp: + print(f"Keine Temperaturdaten für Station '{station_name}'.") + return + + years = [row['year'] for row in df_temp] + avg_tmax = [row['avg_tmax'] for row in df_temp] + avg_tmin = [row['avg_tmin'] for row in df_temp] + + plt.plot(years, avg_tmax, 'r', label='TMAX') + plt.plot(years, avg_tmin, 'b', label='TMIN') plt.xlabel('Jahr') plt.ylabel('Temperatur (°C)') plt.title(f'Temperatur {station_name}') plt.legend() plt.show() + # h) Durchschnitt TMAX pro Jahr und 20-Jahre Durchschnitt def plot_tmax_trend(station_name): - station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] df_trend = spark.sql(f""" - SELECT year(date) as year, AVG(TMAX)/10 as avg_tmax - FROM ghcnd_data - WHERE station = '{station_id}' - GROUP BY year(date) + SELECT + year(d.date) AS year, + AVG(d.value) / 10.0 AS avg_tmax + FROM ghcnddata d + JOIN ghcndstations s + ON d.stationid = s.stationid + WHERE + trim(upper(s.stationname)) = '{station_name.upper()}' + AND d.element = 'TMAX' + GROUP BY year(d.date) ORDER BY year - """).toPandas() - # 20-Jahre Durchschnitt - df_trend['rolling_avg'] = df_trend['avg_tmax'].rolling(20, center=True).mean() - plt.plot(df_trend['year'], df_trend['avg_tmax'], label='Jährlich') - plt.plot(df_trend['year'], df_trend['rolling_avg'], label='20-Jahre') + """).collect() + + if not df_trend: + print(f"Keine TMAX-Daten für Station '{station_name}'.") + return + + years = [row['year'] for row in df_trend] + avg_tmax = [row['avg_tmax'] for row in df_trend] + + rolling_avg = [] + for i in range(len(avg_tmax)): + start = max(0, i - 9) + end = min(len(avg_tmax), i + 11) + rolling_avg.append(sum(avg_tmax[start:end]) / (end - start)) + + plt.plot(years, avg_tmax, label='Jährlich') + plt.plot(years, rolling_avg, label='20-Jahre') plt.xlabel('Jahr') plt.ylabel('Durchschnitt TMAX (°C)') plt.title(f'TMAX Trend {station_name}') plt.legend() plt.show() + # i) Korrelation TMIN und TMAX pro Jahr def plot_corr_temp(station_name): - station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] df_corr = spark.sql(f""" - SELECT year(date) as year, corr(TMIN, TMAX) as correlation + SELECT + year(date) AS year, + corr(tmin_val, tmax_val) AS correlation FROM ( - SELECT date, TMIN, TMAX - FROM ghcnd_data - WHERE station = '{station_id}' AND TMIN IS NOT NULL AND TMAX IS NOT NULL - ) - GROUP BY year(date) + SELECT + d.date, + MAX(CASE WHEN d.element = 'TMIN' THEN d.value END) AS tmin_val, + MAX(CASE WHEN d.element = 'TMAX' THEN d.value END) AS tmax_val + FROM ghcnddata d + JOIN ghcndstations s + ON d.stationid = s.stationid + WHERE + trim(upper(s.stationname)) = '{station_name.upper()}' + AND d.element IN ('TMIN', 'TMAX') + GROUP BY d.date + ) + GROUP BY year(date) ORDER BY year - """).toPandas() - plt.plot(df_corr['year'], df_corr['correlation']) + """).collect() + + if not df_corr: + print(f"Keine Korrelationsdaten für Station '{station_name}'.") + return + + years = [row['year'] for row in df_corr] + correlation = [row['correlation'] for row in df_corr] + + plt.plot(years, correlation) plt.xlabel('Jahr') plt.ylabel('Korrelation TMIN-TMAX') plt.title(f'Korrelation {station_name}') plt.show() + def main(scon, spark): # Daten laden ghcnd_stations.read_ghcnd_from_parquet(spark) - + # a) Liste aller Stationen + print("a)") get_all_stations() - + # b) Anzahl Stationen je Land + print("b)") get_station_count_per_country() - + # c) Stationen in Deutschland + print("c)") get_german_stations() - + # d) Plot für Kempten, Hohenpeissenberg, Zugspitze - plot_temp_day('KEMPTEN', 2020) - plot_temp_day('HOHENPEISSENBERG', 2020) - plot_temp_day('ZUGSPITZE', 2020) - + print("d)") + plot_temp_day('KEMPTEN', 2024) + plot_temp_day('HOHENPEISSENBERG', 2024) + plot_temp_day('ZUGSPITZE', 2024) + # e) Niederschlag + print("e)") plot_precip_year('KEMPTEN') plot_precip_year('HOHENPEISSENBERG') plot_precip_year('ZUGSPITZE') - + # f) Durchschnitt TMAX + print("f)") plot_avg_tmax_day('KEMPTEN') plot_avg_tmax_day('HOHENPEISSENBERG') plot_avg_tmax_day('ZUGSPITZE') - + # g) Temperatur pro Jahr + print("g)") plot_temp_year('KEMPTEN') plot_temp_year('HOHENPEISSENBERG') plot_temp_year('ZUGSPITZE') - + # h) TMAX Trend + print("h)") plot_tmax_trend('KEMPTEN') plot_tmax_trend('HOHENPEISSENBERG') plot_tmax_trend('ZUGSPITZE') - + # i) Korrelation + print("i)") plot_corr_temp('KEMPTEN') plot_corr_temp('HOHENPEISSENBERG') plot_corr_temp('ZUGSPITZE') + if __name__ == "__main__": - main(scon, spark) \ No newline at end of file + main(scon, spark)