diff --git a/Aufgabe 6/main.py b/Aufgabe 6/main.py new file mode 100644 index 0000000..b6ecd77 --- /dev/null +++ b/Aufgabe 6/main.py @@ -0,0 +1,59 @@ +def a(scon, spark, path): + rdd = scon.textFile(path) + return rdd + +def b(rdd): + print("\n=== Aufgabe 6b ===") + total_chars = rdd.map(lambda line: len(line)).reduce(lambda a, b: a + b) + print("Anzahl aller Zeichen im Text:", total_chars) + return total_chars + +def c(rdd, top_n=20): + print("\n=== Aufgabe 6c ===") + chars = rdd.flatMap(lambda line: list(line)) + freq = chars.map(lambda c: (c, 1)).reduceByKey(lambda a, b: a + b) + top_chars = freq.takeOrdered(top_n, key = lambda x: -x[1]) + + print(f"\n Top {top_n} häufigste Zeichen:") + for ch, count in top_chars: + print(f"'{ch}': {count}") + + unique_chars = chars.distinct().collect() + print("\nAnzahl verschiedener Zeichen:", len(unique_chars)) + return freq + +def d(rdd, top_n=20): + print("\n=== Aufgabe 6d ===") + from string import punctuation + def remove_punct(line): + return "".join([" " if ch in punctuation else ch for ch in line]) + + cleaned = rdd.map(remove_punct) + + words = (cleaned + .flatMap(lambda line: line.split()) + .map(lambda w: w.lower()) + .filter(lambda w: w != "")) + + word_freq = words.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b) + top_words = word_freq.takeOrdered(top_n, key=lambda x: -x[1]) + + print(f"\nTop {top_n} häufigste Wörter:") + for w, c in top_words: + print(f"{w}: {c}") + + return word_freq + +def e(scon,spark, path, top_n=20): + rdd = a(scon, spark, path) + return d(rdd, top_n) + +def main(scon, spark): + """ + main(scon, spark) + """ + rdd = a(scon, spark, "/data/texte/test/robinsonCrusoe.txt") + b(rdd) + c(rdd) + d(rdd) + e(scon, spark, "/data/texte/test/DonQuijote.txt") \ No newline at end of file diff --git a/Aufgabe 7/main.py b/Aufgabe 7/main.py new file mode 100644 index 0000000..03a02e9 --- /dev/null +++ b/Aufgabe 7/main.py @@ -0,0 +1,92 @@ +def readAllLanguages(scone, lang_file): + return (scone.textFile(lang_file) + .map(lambda line: line.strip().split(",")) + .filter(lambda x: len(x) == 2) + .map(lambda x: (x[1].lower(), x[0].lower()))) + +def readAllTexts(scon, directory, partitions=1000): + import os + + rdd = scon.wholeTextFiles(directory, minPartitions=partitions) + rdd = rdd.map(lambda x: (os.path.basename(x[0]), x[1])) + return rdd + + +def clean_and_split(rdd): + from string import punctuation + + def clean_text(text): + for ch in punctuation + "\n\t\r": + text = text.replace(ch, " ") + return text + + return (rdd.flatMap(lambda x: [(x[0], w.lower()) + for w in clean_text(x[1]).split(" ") + if w.strip() != ""])) + + +def unique_words_per_file(word_rdd): + return (word_rdd.distinct() + .map(lambda x: (x[1], x[0]))) + + +def join_with_languages(word_file_rdd, lang_rdd): + return word_file_rdd.join(lang_rdd) + + + +def count_words_per_language(joined_rdd): + return (joined_rdd + .map(lambda x: ((x[1][0], x[1][1]), 1)) + .reduceByKey(lambda a, b: a + b)) + + + +def detect_language_per_file(count_rdd): + return (count_rdd + .map(lambda x: (x[0][0], (x[0][1], x[1]))) + .reduceByKey(lambda a, b: a if a[1] > b[1] else b) + .map(lambda x: (x[0], x[1][0]))) + + +def count_texts_per_language(lang_detected_rdd): + return (lang_detected_rdd + .map(lambda x: (x[1], 1)) + .reduceByKey(lambda a, b: a + b) + .sortBy(lambda x: -x[1])) + + +def detect_languages(scon, text_dir, lang_file, partitions=1000): + """ + detect_languages(scon, "/data/texte/txt", "/data/texte/languages.txt") + """ + import time + + start_time = time.time() + + lang_rdd = readAllLanguages(scon, lang_file) + + texts_rdd = readAllTexts(scon, text_dir, partitions=partitions) + + words_rdd = clean_and_split(texts_rdd) + unique_rdd = unique_words_per_file(words_rdd) + joined_rdd = join_with_languages(unique_rdd, lang_rdd) + counts_rdd = count_words_per_language(joined_rdd) + detected_rdd = detect_language_per_file(counts_rdd) + summary_rdd = count_texts_per_language(detected_rdd) + + detected = detected_rdd.collect() + summary = summary_rdd.collect() + + end_time = time.time() + duration = end_time - start_time + + print(f"Abgeschlossen in {duration:.2f} Sekunden\n") + + print("Erkannte Sprachen pro Datei:") + for f, lang in detected[:20]: + print(f"{f}: {lang}") + + print("\nGesamtanzahl Texte pro Sprache:") + for lang, count in summary: + print(f"{lang}: {count}") \ No newline at end of file diff --git a/Aufgabe 8/ghcnd_stations.py b/Aufgabe 8/ghcnd_stations.py new file mode 100644 index 0000000..06eb7bd --- /dev/null +++ b/Aufgabe 8/ghcnd_stations.py @@ -0,0 +1,689 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Load stations, countries, inventory and data from GHCND as Dataset. + +@author: steger + +""" + +# pylint: disable=pointless-string-statement + +import os +from datetime import date +from time import time +from subprocess import call +from pyspark.sql.types import StructType +from pyspark.sql.types import StructField +from pyspark.sql.types import StringType +from pyspark.sql.types import FloatType +from pyspark.sql.types import IntegerType +from pyspark.sql.types import DateType + + +# ============================================= +# run sparkstart.py before to create a session +# ============================================= + +HDFSPATH = "hdfs://193.174.205.250:54310/" +GHCNDPATH = HDFSPATH + "ghcnd/" +GHCNDHOMEPATH = "/data/ghcnd/" + + +def conv_elevation(elev): + """ + Convert an elevation value. + + -999.9 means there is no value. + + Parameters + ---------- + elev : string + The elevation to convert to float. + + Returns + ------- + res : numeric + The converted value as float. + """ + elev = elev.strip() + if elev == "-999.9": + res = None + else: + res = float(elev) + return res + + +def conv_data_value(line, start): + """ + Convert a single data value from a dly.- File. + + Parameters + ---------- + line : string + The line with the data value. + start : int + The index at which the value starts. + + Returns + ------- + res : numeric + The onverted data value as int. + """ + return int(line[start:start+5].strip()) + + +def import_ghcnd_stations(scon, spark, path): + """ + Read the station data into a dataframe. + + Register it as temporary view and write it to parquet. + + Parameters + ---------- + scon : SparkContext + The spark context. + spark : SparkSession + The SQL session. + + Returns + ------- + stationFrame : DataFrame + The spark Data Frame with the stations data. + """ + stationlines = scon.textFile(path + "ghcnd-stations.txt") + stationsplitlines = stationlines.map( + lambda l: + (l[0:2], + l[2:3], + l[0:11], + float(l[12:20].strip()), + float(l[21:30].strip()), + conv_elevation(l[31:37]), + l[41:71] + )) + stationschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('networkcode', StringType(), True), + StructField('stationid', StringType(), True), + StructField('latitude', FloatType(), True), + StructField('longitude', FloatType(), True), + StructField('elevation', FloatType(), True), + StructField('stationname', StringType(), True) + ]) + stationframe = spark.createDataFrame(stationsplitlines, + schema=stationschema) + stationframe.createOrReplaceTempView("ghcndstations") + stationframe.write.mode('overwrite').parquet( + GHCNDPATH + "ghcndstations.parquet") + stationframe.cache() + print("Imported GhcndStations") + return stationframe + + +def import_ghcnd_countries(scon, spark, path): + """ + Read the countries data into a dataframe. + + Register it as temptable and write it to parquet. + + Parameters + ---------- + scon : SparkContext + The spark context. + spark : SparkSession + The SQL session. + path : string + The path where the file with data resides. + + Returns + ------- + stationFrame : DataFrame + The spark Data Frame with the countries data. + """ + countrylines = scon.textFile(path + "ghcnd-countries.txt") + countrysplitlines = countrylines.map(lambda l: (l[0:2], l[2:50])) + countryschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('countryname', StringType(), True)]) + countryframe = spark.createDataFrame(countrysplitlines, countryschema) + countryframe.createOrReplaceTempView("ghcndcountries") + countryframe.write.mode('overwrite').parquet( + GHCNDPATH + "ghcndcountries.parquet") + countryframe.cache() + print("Imported GhcndCountries") + return countryframe + + +def conv_data_line(line): + """ + Convert a data line from GHCND-Datafile (.dly). + + Parameters + ---------- + line : string + String with a data line containing the values for one month. + + Returns + ------- + list of tuple + List containing a tuple for each data value. + """ + if line == '': + return [] + + countrycode = line[0:2] + networkcode = line[2:3] + stationid = line[0:11] + year = int(line[11:15]) + month = int(line[15:17]) + element = line[17:21] + datlst = [] + for i in range(0, 30): + val = conv_data_value(line, 21 + i*8) + if val != -9999: + datlst.append((countrycode, networkcode, stationid, + year, month, i+1, + date(year, month, i+1), + element, + val)) + return datlst + + +def read_dly_file(scon, spark, filename): + """ + Read a .dly-file into a data frame. + + Parameters + ---------- + scon : SparkContext + The spark context. + spark : SparkSession + The SQL session. + filename : string + The name and path of the dly-File. + + Returns + ------- + RDD + The RDD with the contents of the dly-File. + """ + dly = scon.textFile(filename) + return process_dly_file_lines(spark, dly) + + +def process_dly_file_lines(spark, lines): + """ + Process the lines of one dly file. + + Parameters + ---------- + spark : SparkSession + The SQL session. + lines : RDD + RDD with one value per line. + + Returns + ------- + dlyFrame : DataFram + Data Frame containing the data of the file. + + """ + dlsplit = lines.flatMap(conv_data_line) + dlyfileschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('networkcode', StringType(), True), + StructField('stationid', StringType(), True), + StructField('year', IntegerType(), True), + StructField('month', IntegerType(), True), + StructField('day', IntegerType(), True), + StructField('date', DateType(), True), + StructField('element', StringType(), True), + StructField('value', IntegerType(), True) + ]) + dlyframe = spark.createDataFrame(dlsplit, dlyfileschema) + return dlyframe + + +def import_data_rdd_parallel(scon, spark, path): + """ + Import the data files from ghcnd in parallel. + + This is much faster on a cluster or a computer with many cores + and enough main memory to hold all the raw data. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + + Returns + ------- + None. + """ + rdd = scon.textFile( + path+"/ghcnd_all/*.dly", minPartitions=5000) + rddcoa = rdd.coalesce(5000) + + rddsplit = rddcoa.flatMap(conv_data_line) + print("Number of data records = " + str(rddsplit.count())) + print("Number of partitions = " + str(rddsplit.getNumPartitions())) + + dlyfileschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('networkcode', StringType(), True), + StructField('stationid', StringType(), True), + StructField('year', IntegerType(), True), + StructField('month', IntegerType(), True), + StructField('day', IntegerType(), True), + StructField('date', DateType(), True), + StructField('element', StringType(), True), + StructField('value', IntegerType(), True) + ]) + dlyframe = spark.createDataFrame(rddsplit, dlyfileschema) + + dlyframe.show(10) + + dlyframe.write.mode('overwrite').parquet( + GHCNDPATH + "ghcnddata.parquet") + print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet")) + + +def import_data_rdd_parallel_whole(scon, spark, path): + """ + Import the data files from ghcnd in parallel. + + This is much faster on a cluster or a computer with many cores + and enough main memory to hold all the raw data. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + + Returns + ------- + None. + """ + rdd = scon.wholeTextFiles( + path+"/ghcnd_all/*.dly", minPartitions=5000 ) + + rddvals = rdd.values() + print("Number of files in GHCND = " + str(rddvals.count())) + rddlen = rddvals.map(len) + print("Number of characters in all files = " + + str(rddlen.reduce(lambda x, y: x + y))) + + rddlines = rddvals.flatMap(lambda x: x.split("\n")) + print("Number of lines with data = " + str(rddlines.count())) + + rddsplit = rddlines.flatMap(conv_data_line) + print("Number of data records = " + str(rddsplit.count())) + print("Number of partitions = " + str(rddsplit.getNumPartitions())) + + dlyfileschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('networkcode', StringType(), True), + StructField('stationid', StringType(), True), + StructField('year', IntegerType(), True), + StructField('month', IntegerType(), True), + StructField('day', IntegerType(), True), + StructField('date', DateType(), True), + StructField('element', StringType(), True), + StructField('value', IntegerType(), True) + ]) + dlyframe = spark.createDataFrame(rddsplit, dlyfileschema) + + dlyframe.show(10) + + dlyframe.write.mode('overwrite').parquet( + GHCNDPATH + "ghcnddata.parquet") + print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet")) + + """ + Code for testing problems that resulted finally from empty lines + to solve the problem the code + if line == '': + return [] + was added at the beginning of convDataLine to filter away empty lines: + + noyear = rddsplit.filter(lambda x: not x[3].isnumeric()) + noyear.collect() + + rddlines1 = rdd.flatMap(lambda x: [(x[0], y) for y in x[1].split("\n")]) + print(rddlines1.count()) + + rddsplit1 = rddlines1.flatMap(convDataLine1) + print(rddsplit1.count()) + + noyear1 = rddsplit1.filter(lambda x: not x[1][3].isnumeric()) + noyear1.collect() + """ + + +def import_ghcnd_files_extern(scon, spark, path, stationlist, batchsize, + numparts): + """ + Import multiple data files in one batch. + + Import batchsize data files in one batch and append the data into + the parquet file. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + path : string + Path of the data files. + stationlist : list + List of all stations to load. + batchsize : int + Number of files to load in one batch. + numparts : int + Number of partitions to write one batch. + + Returns + ------- + None. + + """ + data = None + count = 0 + allcount = 0 + batchcount = 0 + for station in stationlist: + # filename = "file://" + path + "/" + station + ".dly" + filename = path + station + ".dly" + if os.path.isfile(filename): + dly = read_dly_file(spark, scon, "file://" + filename) + if data is not None: + data = data.union(dly) + print("Batch " + str(batchcount) + + " Filenr " + str(count) + " Processing " + filename) + else: + tstart = time() + data = dly + count += 1 + if count >= batchsize: + # data = data.sort('countrycode', 'stationid', 'date') + data = data.coalesce(numparts) + tcoalesce = time() + data.write.mode('Append').parquet( + GHCNDPATH + "ghcnddata.parquet") + anzrec = data.count() + twrite = time() + print( + "\n\nBatch " + str(batchcount) + + " #recs " + str(anzrec) + + " #files " + str(allcount) + + " readtime " + str.format("{:f}", tcoalesce - tstart) + + " writetime " + str.format("{:f}", twrite - tcoalesce) + + " recs/sec " + + str.format("{:f}", anzrec / (twrite - tstart)) + "\n\n") + allcount += count + count = 0 + batchcount += 1 + data = None + else: + print("importGhcndFilesExtern: " + station + + ", " + filename + " not found") + if data is not None: + data = data.coalesce(numparts) + data.write.mode('Append').parquet(GHCNDPATH + "ghcnddata.parquet") + + +def import_all_data(scon, spark, path): + """ + Import all data from GHCND. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + path : string + Path of data files. + + Returns + ------- + None. + + """ + stationlist = spark.sql( + "SELECT stationid AS station \ + FROM ghcndstations \ + ORDER BY station") + pds = stationlist.toPandas() + import_ghcnd_files_extern(scon, spark, path + "ghcnd_all/", + pds.station, 30, 1) + + +def import_data_single_files(scon, spark, stationlist, parquetname, path): + """ + Import the data files one by one. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + stationlist : list + List of all stations to import data. + parquetname : string + Name of the parquet file to write the data to. + path : string + Path where the data files reside. + + Returns + ------- + None. + + """ + pds = stationlist.toPandas() + cnt = 0 + for station in pds.station: + filename = path + station + ".dly" + if os.path.isfile(filename): + start = time() + dly = read_dly_file(spark, scon, "file://" + filename) + numrec = dly.count() + dly = dly.coalesce(1).sort('element', 'date') + read = time() + dly.write.mode('Append').parquet(GHCNDPATH + + parquetname + ".parquet") + finish = time() + print(str.format( + "{:8d} ", cnt) + station + + " #rec " + str.format("{:7d}", numrec) + + " read " + str.format("{:f}", read - start) + + " write " + str.format("{:f}", finish - read) + + " write/sec " + str.format("importDataSingleFiles{:f} ", + numrec/(finish - read)) + + " " + filename) + else: + print("#### " + str(cnt) + " File " + + filename + " does not exist ####") + cnt += 1 + + +def check_files(spark): + """ + Check if some files for generated stationnames do not exist. + + Parameters + ---------- + spark : SparkSession + The SQL session. + + Returns + ------- + None. + + """ + stationlist = spark.sql( + "SELECT CONCAT(countrycode, networkcode, stationid) AS station \ + FROM ghcndstations \ + ORDER BY station") + pds = stationlist.toPandas() + count = 1 + for station in pds.station: + filename = "/nfs/home/steger/ghcnd/ghcnd_all/" + station + ".dly" + if os.path.isfile(filename): + # print(str(count) + " " + station) + pass + else: + print(str(count) + " File does not exist: " + filename) + count += 1 + + """ + Read the inventory data into a dataframe, + register it as temporary view and write it to parquet + """ + + +def import_ghcnd_inventory(scon, spark, path): + """ + Import inventory information from GHCND. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + path : string + Path for inventory file. + + Returns + ------- + invframe : DataFrame + Data Frame with inventory data. + + """ + invlines = scon.textFile(path + "ghcnd-inventory.txt") + invsplitlines = invlines.map( + lambda l: + (l[0:2], + l[2:3], + l[0:11], + float(l[12:20].strip()), + float(l[21:30].strip()), + l[31:35], + int(l[36:40]), + int(l[41:45]) + )) + invschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('networkcode', StringType(), True), + StructField('stationid', StringType(), True), + StructField('latitude', FloatType(), True), + StructField('longitude', FloatType(), True), + StructField('element', StringType(), True), + StructField('firstyear', IntegerType(), True), + StructField('lastyear', IntegerType(), True) + ]) + invframe = spark.createDataFrame(invsplitlines, invschema) + invframe.createOrReplaceTempView("ghcndinventory") + invframe.write.mode('overwrite').parquet( + GHCNDPATH + "ghcndinventory.parquet") + invframe.cache() + print("Imported GhcndInventory") + return invframe + + +def import_ghcnd_all(scon, spark): + """ + Import all files from GHCND. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + + Returns + ------- + None. + + """ + localfilepath = "file://" + GHCNDHOMEPATH + import_ghcnd_countries(scon, spark, localfilepath) + import_ghcnd_stations(scon, spark, localfilepath) + import_ghcnd_inventory(scon, spark, localfilepath) + # import_all_data(scon, spark, GHCNDHOMEPATH) + import_data_rdd_parallel(scon, spark, localfilepath) + + +def read_ghcnd_from_parquet(spark): + """ + Read all data from the parquet files into Dataframes. + + Create temporary views from the parquet files. + + Parameters + ---------- + spark : SparkSession + The SQL Session. + + Returns + ------- + None. + + """ + dfcountries = spark.read.parquet(GHCNDPATH + "ghcndcountries") + dfcountries.createOrReplaceTempView("ghcndcountries") + dfcountries.cache() + + dfstations = spark.read.parquet(GHCNDPATH + "ghcndstations") + dfstations.createOrReplaceTempView("ghcndstations") + dfstations.cache() + + dfinventory = spark.read.parquet(GHCNDPATH + "ghcndinventory") + dfinventory.createOrReplaceTempView("ghcndinventory") + dfinventory.cache() + + dfdata = spark.read.parquet(GHCNDPATH + "ghcnddata") + dfdata.createOrReplaceTempView("ghcnddata") + dfdata.cache() + + +def delete_all_parquet_ghcnd(): + """ + Delete all parquet files that were imported from GHCND. + + Returns + ------- + None. + + """ + delete_from_hdfs(GHCNDPATH + "ghcndstations.parquet") + delete_from_hdfs(GHCNDPATH + "ghcndcountries.parquet") + delete_from_hdfs(GHCNDPATH + "ghcndinventory.parquet") + delete_from_hdfs(GHCNDPATH + "ghcnddata.parquet") + + +def delete_from_hdfs(path): + """ + Delete the file in path from HDFS. + + Parameters + ---------- + path : string + Path of the file in HDFS. + + Returns + ------- + None. + + """ + call("hdfs dfs -rm -R " + path, + shell=True) \ No newline at end of file diff --git a/Aufgabe 8/main.py b/Aufgabe 8/main.py new file mode 100644 index 0000000..2cdc85d --- /dev/null +++ b/Aufgabe 8/main.py @@ -0,0 +1,220 @@ +from sparkstart import scon, spark +import ghcnd_stations +import matplotlib.pyplot as plt +import time + +# a) Liste aller Stationen sortiert nach Stationsname +def get_all_stations(): + start = time.time() + result = spark.sql("SELECT * FROM stations ORDER BY name") + result.show() + end = time.time() + print(f"Zeit: {end - start}") + # Zweite Ausführung + start = time.time() + result = spark.sql("SELECT * FROM stations ORDER BY name") + result.show() + end = time.time() + print(f"Zeit zweite Ausführung: {end - start}") + +# b) Anzahl der Stationen je Land +def get_station_count_per_country(): + start = time.time() + result = spark.sql(""" + SELECT c.country_code, c.name, COUNT(s.id) as count + FROM stations s + JOIN ghcndcountries c ON s.country = c.country_code + GROUP BY c.country_code, c.name + ORDER BY count DESC + """) + result.show(truncate=False) + end = time.time() + print(f"Zeit: {end - start}") + # Zweite + start = time.time() + result = spark.sql(""" + SELECT c.country_code, c.name, COUNT(s.id) as count + FROM stations s + JOIN ghcndcountries c ON s.country = c.country_code + GROUP BY c.country_code, c.name + ORDER BY count DESC + """) + result.show(truncate=False) + end = time.time() + print(f"Zeit zweite: {end - start}") + +# c) Stationen in Deutschland +def get_german_stations(): + start = time.time() + result = spark.sql("SELECT * FROM stations WHERE country = 'GM' ORDER BY name") + result.show() + end = time.time() + print(f"Zeit: {end - start}") + # Zweite + start = time.time() + result = spark.sql("SELECT * FROM stations WHERE country = 'GM' ORDER BY name") + result.show() + end = time.time() + print(f"Zeit zweite: {end - start}") + +# d) Plot TMAX und TMIN für Station und Jahr +def plot_temp_day(station_name, year): + # Station ID finden + station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] + # Daten filtern + df_filtered = spark.sql(f""" + SELECT date, TMAX, TMIN FROM ghcnd_data + WHERE station = '{station_id}' AND year(date) = {year} + ORDER BY date + """).toPandas() + # Temperaturen in Grad umrechnen + df_filtered['TMAX'] /= 10 + df_filtered['TMIN'] /= 10 + # Tage des Jahres + df_filtered['day_of_year'] = df_filtered['date'].dt.dayofyear + plt.plot(df_filtered['day_of_year'], df_filtered['TMAX'], 'r', label='TMAX') + plt.plot(df_filtered['day_of_year'], df_filtered['TMIN'], 'b', label='TMIN') + plt.xlabel('Tag des Jahres') + plt.ylabel('Temperatur (°C)') + plt.title(f'{station_name} {year}') + plt.legend() + plt.show() + +# e) Gesamt-Niederschlag pro Jahr für Station +def plot_precip_year(station_name): + station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] + df_precip = spark.sql(f""" + SELECT year(date) as year, SUM(PRCP)/10 as total_precip + FROM ghcnd_data + WHERE station = '{station_id}' + GROUP BY year(date) + ORDER BY year + """).toPandas() + plt.bar(df_precip['year'], df_precip['total_precip']) + plt.xlabel('Jahr') + plt.ylabel('Niederschlag (mm)') + plt.title(f'Gesamt-Niederschlag {station_name}') + plt.show() + +# f) Durchschnitt TMAX pro Tag des Jahres, mit 21-Tage Durchschnitt +def plot_avg_tmax_day(station_name): + station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] + df_avg = spark.sql(f""" + SELECT dayofyear(date) as day, AVG(TMAX)/10 as avg_tmax + FROM ghcnd_data + WHERE station = '{station_id}' + GROUP BY dayofyear(date) + ORDER BY day + """).toPandas() + # 21-Tage Durchschnitt + df_avg['rolling_avg'] = df_avg['avg_tmax'].rolling(21, center=True).mean() + plt.plot(df_avg['day'], df_avg['avg_tmax'], label='Täglich') + plt.plot(df_avg['day'], df_avg['rolling_avg'], label='21-Tage') + plt.xlabel('Tag des Jahres') + plt.ylabel('Durchschnitt TMAX (°C)') + plt.title(f'Durchschnitt TMAX {station_name}') + plt.legend() + plt.show() + +# g) Durchschnitt TMAX und TMIN pro Jahr für Station +def plot_temp_year(station_name): + station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] + df_temp = spark.sql(f""" + SELECT year(date) as year, AVG(TMAX)/10 as avg_tmax, AVG(TMIN)/10 as avg_tmin + FROM ghcnd_data + WHERE station = '{station_id}' + GROUP BY year(date) + ORDER BY year + """).toPandas() + plt.plot(df_temp['year'], df_temp['avg_tmax'], 'r', label='TMAX') + plt.plot(df_temp['year'], df_temp['avg_tmin'], 'b', label='TMIN') + plt.xlabel('Jahr') + plt.ylabel('Temperatur (°C)') + plt.title(f'Temperatur {station_name}') + plt.legend() + plt.show() + +# h) Durchschnitt TMAX pro Jahr und 20-Jahre Durchschnitt +def plot_tmax_trend(station_name): + station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] + df_trend = spark.sql(f""" + SELECT year(date) as year, AVG(TMAX)/10 as avg_tmax + FROM ghcnd_data + WHERE station = '{station_id}' + GROUP BY year(date) + ORDER BY year + """).toPandas() + # 20-Jahre Durchschnitt + df_trend['rolling_avg'] = df_trend['avg_tmax'].rolling(20, center=True).mean() + plt.plot(df_trend['year'], df_trend['avg_tmax'], label='Jährlich') + plt.plot(df_trend['year'], df_trend['rolling_avg'], label='20-Jahre') + plt.xlabel('Jahr') + plt.ylabel('Durchschnitt TMAX (°C)') + plt.title(f'TMAX Trend {station_name}') + plt.legend() + plt.show() + +# i) Korrelation TMIN und TMAX pro Jahr +def plot_corr_temp(station_name): + station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0] + df_corr = spark.sql(f""" + SELECT year(date) as year, corr(TMIN, TMAX) as correlation + FROM ( + SELECT date, TMIN, TMAX + FROM ghcnd_data + WHERE station = '{station_id}' AND TMIN IS NOT NULL AND TMAX IS NOT NULL + ) + GROUP BY year(date) + ORDER BY year + """).toPandas() + plt.plot(df_corr['year'], df_corr['correlation']) + plt.xlabel('Jahr') + plt.ylabel('Korrelation TMIN-TMAX') + plt.title(f'Korrelation {station_name}') + plt.show() + +def main(scon, spark): + # Daten laden + ghcnd_stations.read_ghcnd_from_parquet(spark) + + # a) Liste aller Stationen + get_all_stations() + + # b) Anzahl Stationen je Land + get_station_count_per_country() + + # c) Stationen in Deutschland + get_german_stations() + + # d) Plot für Kempten, Hohenpeissenberg, Zugspitze + plot_temp_day('KEMPTEN', 2020) + plot_temp_day('HOHENPEISSENBERG', 2020) + plot_temp_day('ZUGSPITZE', 2020) + + # e) Niederschlag + plot_precip_year('KEMPTEN') + plot_precip_year('HOHENPEISSENBERG') + plot_precip_year('ZUGSPITZE') + + # f) Durchschnitt TMAX + plot_avg_tmax_day('KEMPTEN') + plot_avg_tmax_day('HOHENPEISSENBERG') + plot_avg_tmax_day('ZUGSPITZE') + + # g) Temperatur pro Jahr + plot_temp_year('KEMPTEN') + plot_temp_year('HOHENPEISSENBERG') + plot_temp_year('ZUGSPITZE') + + # h) TMAX Trend + plot_tmax_trend('KEMPTEN') + plot_tmax_trend('HOHENPEISSENBERG') + plot_tmax_trend('ZUGSPITZE') + + # i) Korrelation + plot_corr_temp('KEMPTEN') + plot_corr_temp('HOHENPEISSENBERG') + plot_corr_temp('ZUGSPITZE') + +if __name__ == "__main__": + main(scon, spark) \ No newline at end of file diff --git a/Aufgabe 8/sparkstart.py b/Aufgabe 8/sparkstart.py new file mode 100644 index 0000000..bdb7010 --- /dev/null +++ b/Aufgabe 8/sparkstart.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +""" +Erzeugen einer Spark-Konfiguration +""" + +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession + +# connect to cluster +conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin") +conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +conf.set("spark.executor.memory", '32g') +conf.set("spark.driver.memory", '8g') +conf.set("spark.cores.max", "40") +scon = SparkContext(conf=conf) + + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL") \ + .getOrCreate()