From 90524173f424a10344b99fcbac783a077c3d83ec Mon Sep 17 00:00:00 2001 From: Valentin Heiserer Date: Fri, 28 Nov 2025 12:04:55 +0100 Subject: [PATCH] add 9 + 10 --- Aufgabe 10/Aufgabe10.py | 209 +++++++++++ Aufgabe 10/ghcnd_stations.py | 689 +++++++++++++++++++++++++++++++++++ Aufgabe 10/sparkstart.py | 22 ++ Aufgabe 8/main.py | 2 +- Aufgabe 9/Aufgabe9.py | 141 +++++++ Aufgabe 9/sparkstart.py | 21 ++ 6 files changed, 1083 insertions(+), 1 deletion(-) create mode 100644 Aufgabe 10/Aufgabe10.py create mode 100644 Aufgabe 10/ghcnd_stations.py create mode 100644 Aufgabe 10/sparkstart.py create mode 100644 Aufgabe 9/Aufgabe9.py create mode 100644 Aufgabe 9/sparkstart.py diff --git a/Aufgabe 10/Aufgabe10.py b/Aufgabe 10/Aufgabe10.py new file mode 100644 index 0000000..b2af35a --- /dev/null +++ b/Aufgabe 10/Aufgabe10.py @@ -0,0 +1,209 @@ +from sparkstart import scon, spark +import ghcnd_stations +import matplotlib.pyplot as plt +import time + +# a) Scatterplot: alle Stationen (lon/lat) +def plot_all_stations(spark): + q = """ + SELECT stationname, latitude, longitude + FROM cdc_stations + WHERE latitude IS NOT NULL AND longitude IS NOT NULL + """ + t0 = time.time() + rows = spark.sql(q).collect() + t1 = time.time() + print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}") + + lats = [r['latitude'] for r in rows] + lons = [r['longitude'] for r in rows] + names = [r['stationname'] for r in rows] + + plt.figure(figsize=(8,6)) + plt.scatter(lons, lats, s=10, alpha=0.6) + plt.xlabel('Longitude') + plt.ylabel('Latitude') + plt.title('Alle CDC-Stationen (Scatter)') + plt.grid(True) + plt.show() + + +# b) Scatterplot: Stationsdauer in Jahren als Marker-Size +def plot_station_duration(spark, size_factor=20): + q = """ + SELECT + stationname, + latitude, + longitude, + (CAST(CASE WHEN length(to_date) >= 4 THEN substr(to_date,1,4) ELSE year(current_date()) END AS INT) + - CAST(substr(from_date,1,4) AS INT)) AS years + FROM cdc_stations + WHERE latitude IS NOT NULL AND longitude IS NOT NULL + """ + t0 = time.time() + rows = spark.sql(q).collect() + t1 = time.time() + print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}") + + lats = [r['latitude'] for r in rows] + lons = [r['longitude'] for r in rows] + years = [r['years'] if r['years'] is not None else 0 for r in rows] + sizes = [max(5, (y+1) * size_factor) for y in years] + + plt.figure(figsize=(8,6)) + plt.scatter(lons, lats, s=sizes, alpha=0.6) + plt.xlabel('Longitude') + plt.ylabel('Latitude') + plt.title('CDC-Stationen: Dauer der Verfuegbarkeit (Größe ~ Jahre)') + plt.grid(True) + plt.show() + +def plot_frost_distribution_year(spark, year): + q = f""" + WITH daily_max AS ( + SELECT stationid, date, MAX(tt_tu) AS max_temp + FROM cdc_hourly + WHERE length(date) >= 8 AND substr(date,1,4) = '{year}' + GROUP BY stationid, date + ), + station_frost AS ( + SELECT dm.stationid, SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays + FROM daily_max dm + GROUP BY dm.stationid + ) + SELECT sf.frostdays, COUNT(*) AS stations + FROM station_frost sf + GROUP BY sf.frostdays + ORDER BY sf.frostdays + """ + t0 = time.time() + rows = spark.sql(q).collect() + t1 = time.time() + print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Distinct frostdays: {len(rows)}") + + x = [r['frostdays'] for r in rows] + y = [r['stations'] for r in rows] + + plt.figure(figsize=(8,5)) + plt.bar(x, y) + plt.xlabel('Anzahl Frosttage im Jahr ' + str(year)) + plt.ylabel('Anzahl Stationen') + plt.title(f'Verteilung der Frosttage pro Station im Jahr {year}') + plt.grid(True) + plt.show() + + +# c2) Frosttage Zeitreihe für eine Station mit 5- und 20-Jahres Durchschnitt (SQL window) +def plot_station_frost_timeseries(spark, station_name): + q = f""" + WITH daily_max AS ( + SELECT stationid, date, MAX(tt_tu) AS max_temp + FROM cdc_hourly + GROUP BY stationid, date + ), + yearly AS ( + SELECT + dm.stationid, + CAST(substr(dm.date,1,4) AS INT) AS year, + SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays + FROM daily_max dm + GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT) + ), + station_yearly AS ( + SELECT + y.year, + y.frostdays, + AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg5, + AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg20 + FROM yearly y + JOIN cdc_stations s ON y.stationid = s.stationid + WHERE trim(upper(s.stationname)) = '{station_name.upper()}' + ORDER BY y.year + ) + SELECT * FROM station_yearly + """ + t0 = time.time() + rows = spark.sql(q).collect() + t1 = time.time() + print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years: {len(rows)}") + + if not rows: + print(f"Keine Daten f\u00fcr Station '{station_name}'.") + return + + years = [r['year'] for r in rows] + frostdays = [r['frostdays'] for r in rows] + avg5 = [r['avg5'] for r in rows] + avg20 = [r['avg20'] for r in rows] + + plt.figure(figsize=(10,5)) + plt.plot(years, frostdays, label='Frosttage (Jahr)') + plt.plot(years, avg5, label='5-Jahres-Durchschnitt') + plt.plot(years, avg20, label='20-Jahres-Durchschnitt') + plt.xlabel('Jahr') + plt.ylabel('Anzahl Frosttage') + plt.title(f'Frosttage f\u00fcr Station {station_name}') + plt.legend() + plt.grid(True) + plt.show() + + +# d) Korrelation Hoehe vs. Frosttage pro Jahr +def plot_height_frost_correlation(spark): + q = """ + WITH daily_max AS ( + SELECT stationid, date, MAX(tt_tu) AS max_temp + FROM cdc_hourly + GROUP BY stationid, date + ), + yearly AS ( + SELECT + dm.stationid, + CAST(substr(dm.date,1,4) AS INT) AS year, + SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays + FROM daily_max dm + GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT) + ), + joined AS ( + SELECT y.year, s.height, y.frostdays + FROM yearly y + JOIN cdc_stations s ON y.stationid = s.stationid + ), + yearly_corr AS ( + SELECT year, corr(height, frostdays) AS corr + FROM joined + GROUP BY year + ORDER BY year + ) + SELECT year, corr FROM yearly_corr WHERE corr IS NOT NULL + """ + t0 = time.time() + rows = spark.sql(q).collect() + t1 = time.time() + print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years with corr: {len(rows)}") + + if not rows: + print("Keine Korrelationsdaten verfügbar.") + return + + years = [r['year'] for r in rows] + corr = [r['corr'] for r in rows] + + plt.figure(figsize=(10,5)) + plt.bar(years, corr) + plt.xlabel('Jahr') + plt.ylabel('Korrelationskoeffizient (height vs frostdays)') + plt.title('Korrelation Hoehe vs. Frosttage pro Jahr') + plt.grid(True) + plt.show() + + +if __name__ == '__main__': + ghcnd_stations.read_ghcnd_from_parquet(spark) + + plot_all_stations(spark) + plot_station_duration(spark) + plot_frost_distribution_year(spark, '2010') + plot_station_frost_timeseries(spark, 'KEMPTEN') + plot_height_frost_correlation(spark) + pass diff --git a/Aufgabe 10/ghcnd_stations.py b/Aufgabe 10/ghcnd_stations.py new file mode 100644 index 0000000..06eb7bd --- /dev/null +++ b/Aufgabe 10/ghcnd_stations.py @@ -0,0 +1,689 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Load stations, countries, inventory and data from GHCND as Dataset. + +@author: steger + +""" + +# pylint: disable=pointless-string-statement + +import os +from datetime import date +from time import time +from subprocess import call +from pyspark.sql.types import StructType +from pyspark.sql.types import StructField +from pyspark.sql.types import StringType +from pyspark.sql.types import FloatType +from pyspark.sql.types import IntegerType +from pyspark.sql.types import DateType + + +# ============================================= +# run sparkstart.py before to create a session +# ============================================= + +HDFSPATH = "hdfs://193.174.205.250:54310/" +GHCNDPATH = HDFSPATH + "ghcnd/" +GHCNDHOMEPATH = "/data/ghcnd/" + + +def conv_elevation(elev): + """ + Convert an elevation value. + + -999.9 means there is no value. + + Parameters + ---------- + elev : string + The elevation to convert to float. + + Returns + ------- + res : numeric + The converted value as float. + """ + elev = elev.strip() + if elev == "-999.9": + res = None + else: + res = float(elev) + return res + + +def conv_data_value(line, start): + """ + Convert a single data value from a dly.- File. + + Parameters + ---------- + line : string + The line with the data value. + start : int + The index at which the value starts. + + Returns + ------- + res : numeric + The onverted data value as int. + """ + return int(line[start:start+5].strip()) + + +def import_ghcnd_stations(scon, spark, path): + """ + Read the station data into a dataframe. + + Register it as temporary view and write it to parquet. + + Parameters + ---------- + scon : SparkContext + The spark context. + spark : SparkSession + The SQL session. + + Returns + ------- + stationFrame : DataFrame + The spark Data Frame with the stations data. + """ + stationlines = scon.textFile(path + "ghcnd-stations.txt") + stationsplitlines = stationlines.map( + lambda l: + (l[0:2], + l[2:3], + l[0:11], + float(l[12:20].strip()), + float(l[21:30].strip()), + conv_elevation(l[31:37]), + l[41:71] + )) + stationschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('networkcode', StringType(), True), + StructField('stationid', StringType(), True), + StructField('latitude', FloatType(), True), + StructField('longitude', FloatType(), True), + StructField('elevation', FloatType(), True), + StructField('stationname', StringType(), True) + ]) + stationframe = spark.createDataFrame(stationsplitlines, + schema=stationschema) + stationframe.createOrReplaceTempView("ghcndstations") + stationframe.write.mode('overwrite').parquet( + GHCNDPATH + "ghcndstations.parquet") + stationframe.cache() + print("Imported GhcndStations") + return stationframe + + +def import_ghcnd_countries(scon, spark, path): + """ + Read the countries data into a dataframe. + + Register it as temptable and write it to parquet. + + Parameters + ---------- + scon : SparkContext + The spark context. + spark : SparkSession + The SQL session. + path : string + The path where the file with data resides. + + Returns + ------- + stationFrame : DataFrame + The spark Data Frame with the countries data. + """ + countrylines = scon.textFile(path + "ghcnd-countries.txt") + countrysplitlines = countrylines.map(lambda l: (l[0:2], l[2:50])) + countryschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('countryname', StringType(), True)]) + countryframe = spark.createDataFrame(countrysplitlines, countryschema) + countryframe.createOrReplaceTempView("ghcndcountries") + countryframe.write.mode('overwrite').parquet( + GHCNDPATH + "ghcndcountries.parquet") + countryframe.cache() + print("Imported GhcndCountries") + return countryframe + + +def conv_data_line(line): + """ + Convert a data line from GHCND-Datafile (.dly). + + Parameters + ---------- + line : string + String with a data line containing the values for one month. + + Returns + ------- + list of tuple + List containing a tuple for each data value. + """ + if line == '': + return [] + + countrycode = line[0:2] + networkcode = line[2:3] + stationid = line[0:11] + year = int(line[11:15]) + month = int(line[15:17]) + element = line[17:21] + datlst = [] + for i in range(0, 30): + val = conv_data_value(line, 21 + i*8) + if val != -9999: + datlst.append((countrycode, networkcode, stationid, + year, month, i+1, + date(year, month, i+1), + element, + val)) + return datlst + + +def read_dly_file(scon, spark, filename): + """ + Read a .dly-file into a data frame. + + Parameters + ---------- + scon : SparkContext + The spark context. + spark : SparkSession + The SQL session. + filename : string + The name and path of the dly-File. + + Returns + ------- + RDD + The RDD with the contents of the dly-File. + """ + dly = scon.textFile(filename) + return process_dly_file_lines(spark, dly) + + +def process_dly_file_lines(spark, lines): + """ + Process the lines of one dly file. + + Parameters + ---------- + spark : SparkSession + The SQL session. + lines : RDD + RDD with one value per line. + + Returns + ------- + dlyFrame : DataFram + Data Frame containing the data of the file. + + """ + dlsplit = lines.flatMap(conv_data_line) + dlyfileschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('networkcode', StringType(), True), + StructField('stationid', StringType(), True), + StructField('year', IntegerType(), True), + StructField('month', IntegerType(), True), + StructField('day', IntegerType(), True), + StructField('date', DateType(), True), + StructField('element', StringType(), True), + StructField('value', IntegerType(), True) + ]) + dlyframe = spark.createDataFrame(dlsplit, dlyfileschema) + return dlyframe + + +def import_data_rdd_parallel(scon, spark, path): + """ + Import the data files from ghcnd in parallel. + + This is much faster on a cluster or a computer with many cores + and enough main memory to hold all the raw data. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + + Returns + ------- + None. + """ + rdd = scon.textFile( + path+"/ghcnd_all/*.dly", minPartitions=5000) + rddcoa = rdd.coalesce(5000) + + rddsplit = rddcoa.flatMap(conv_data_line) + print("Number of data records = " + str(rddsplit.count())) + print("Number of partitions = " + str(rddsplit.getNumPartitions())) + + dlyfileschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('networkcode', StringType(), True), + StructField('stationid', StringType(), True), + StructField('year', IntegerType(), True), + StructField('month', IntegerType(), True), + StructField('day', IntegerType(), True), + StructField('date', DateType(), True), + StructField('element', StringType(), True), + StructField('value', IntegerType(), True) + ]) + dlyframe = spark.createDataFrame(rddsplit, dlyfileschema) + + dlyframe.show(10) + + dlyframe.write.mode('overwrite').parquet( + GHCNDPATH + "ghcnddata.parquet") + print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet")) + + +def import_data_rdd_parallel_whole(scon, spark, path): + """ + Import the data files from ghcnd in parallel. + + This is much faster on a cluster or a computer with many cores + and enough main memory to hold all the raw data. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + + Returns + ------- + None. + """ + rdd = scon.wholeTextFiles( + path+"/ghcnd_all/*.dly", minPartitions=5000 ) + + rddvals = rdd.values() + print("Number of files in GHCND = " + str(rddvals.count())) + rddlen = rddvals.map(len) + print("Number of characters in all files = " + + str(rddlen.reduce(lambda x, y: x + y))) + + rddlines = rddvals.flatMap(lambda x: x.split("\n")) + print("Number of lines with data = " + str(rddlines.count())) + + rddsplit = rddlines.flatMap(conv_data_line) + print("Number of data records = " + str(rddsplit.count())) + print("Number of partitions = " + str(rddsplit.getNumPartitions())) + + dlyfileschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('networkcode', StringType(), True), + StructField('stationid', StringType(), True), + StructField('year', IntegerType(), True), + StructField('month', IntegerType(), True), + StructField('day', IntegerType(), True), + StructField('date', DateType(), True), + StructField('element', StringType(), True), + StructField('value', IntegerType(), True) + ]) + dlyframe = spark.createDataFrame(rddsplit, dlyfileschema) + + dlyframe.show(10) + + dlyframe.write.mode('overwrite').parquet( + GHCNDPATH + "ghcnddata.parquet") + print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet")) + + """ + Code for testing problems that resulted finally from empty lines + to solve the problem the code + if line == '': + return [] + was added at the beginning of convDataLine to filter away empty lines: + + noyear = rddsplit.filter(lambda x: not x[3].isnumeric()) + noyear.collect() + + rddlines1 = rdd.flatMap(lambda x: [(x[0], y) for y in x[1].split("\n")]) + print(rddlines1.count()) + + rddsplit1 = rddlines1.flatMap(convDataLine1) + print(rddsplit1.count()) + + noyear1 = rddsplit1.filter(lambda x: not x[1][3].isnumeric()) + noyear1.collect() + """ + + +def import_ghcnd_files_extern(scon, spark, path, stationlist, batchsize, + numparts): + """ + Import multiple data files in one batch. + + Import batchsize data files in one batch and append the data into + the parquet file. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + path : string + Path of the data files. + stationlist : list + List of all stations to load. + batchsize : int + Number of files to load in one batch. + numparts : int + Number of partitions to write one batch. + + Returns + ------- + None. + + """ + data = None + count = 0 + allcount = 0 + batchcount = 0 + for station in stationlist: + # filename = "file://" + path + "/" + station + ".dly" + filename = path + station + ".dly" + if os.path.isfile(filename): + dly = read_dly_file(spark, scon, "file://" + filename) + if data is not None: + data = data.union(dly) + print("Batch " + str(batchcount) + + " Filenr " + str(count) + " Processing " + filename) + else: + tstart = time() + data = dly + count += 1 + if count >= batchsize: + # data = data.sort('countrycode', 'stationid', 'date') + data = data.coalesce(numparts) + tcoalesce = time() + data.write.mode('Append').parquet( + GHCNDPATH + "ghcnddata.parquet") + anzrec = data.count() + twrite = time() + print( + "\n\nBatch " + str(batchcount) + + " #recs " + str(anzrec) + + " #files " + str(allcount) + + " readtime " + str.format("{:f}", tcoalesce - tstart) + + " writetime " + str.format("{:f}", twrite - tcoalesce) + + " recs/sec " + + str.format("{:f}", anzrec / (twrite - tstart)) + "\n\n") + allcount += count + count = 0 + batchcount += 1 + data = None + else: + print("importGhcndFilesExtern: " + station + + ", " + filename + " not found") + if data is not None: + data = data.coalesce(numparts) + data.write.mode('Append').parquet(GHCNDPATH + "ghcnddata.parquet") + + +def import_all_data(scon, spark, path): + """ + Import all data from GHCND. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + path : string + Path of data files. + + Returns + ------- + None. + + """ + stationlist = spark.sql( + "SELECT stationid AS station \ + FROM ghcndstations \ + ORDER BY station") + pds = stationlist.toPandas() + import_ghcnd_files_extern(scon, spark, path + "ghcnd_all/", + pds.station, 30, 1) + + +def import_data_single_files(scon, spark, stationlist, parquetname, path): + """ + Import the data files one by one. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + stationlist : list + List of all stations to import data. + parquetname : string + Name of the parquet file to write the data to. + path : string + Path where the data files reside. + + Returns + ------- + None. + + """ + pds = stationlist.toPandas() + cnt = 0 + for station in pds.station: + filename = path + station + ".dly" + if os.path.isfile(filename): + start = time() + dly = read_dly_file(spark, scon, "file://" + filename) + numrec = dly.count() + dly = dly.coalesce(1).sort('element', 'date') + read = time() + dly.write.mode('Append').parquet(GHCNDPATH + + parquetname + ".parquet") + finish = time() + print(str.format( + "{:8d} ", cnt) + station + + " #rec " + str.format("{:7d}", numrec) + + " read " + str.format("{:f}", read - start) + + " write " + str.format("{:f}", finish - read) + + " write/sec " + str.format("importDataSingleFiles{:f} ", + numrec/(finish - read)) + + " " + filename) + else: + print("#### " + str(cnt) + " File " + + filename + " does not exist ####") + cnt += 1 + + +def check_files(spark): + """ + Check if some files for generated stationnames do not exist. + + Parameters + ---------- + spark : SparkSession + The SQL session. + + Returns + ------- + None. + + """ + stationlist = spark.sql( + "SELECT CONCAT(countrycode, networkcode, stationid) AS station \ + FROM ghcndstations \ + ORDER BY station") + pds = stationlist.toPandas() + count = 1 + for station in pds.station: + filename = "/nfs/home/steger/ghcnd/ghcnd_all/" + station + ".dly" + if os.path.isfile(filename): + # print(str(count) + " " + station) + pass + else: + print(str(count) + " File does not exist: " + filename) + count += 1 + + """ + Read the inventory data into a dataframe, + register it as temporary view and write it to parquet + """ + + +def import_ghcnd_inventory(scon, spark, path): + """ + Import inventory information from GHCND. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + path : string + Path for inventory file. + + Returns + ------- + invframe : DataFrame + Data Frame with inventory data. + + """ + invlines = scon.textFile(path + "ghcnd-inventory.txt") + invsplitlines = invlines.map( + lambda l: + (l[0:2], + l[2:3], + l[0:11], + float(l[12:20].strip()), + float(l[21:30].strip()), + l[31:35], + int(l[36:40]), + int(l[41:45]) + )) + invschema = StructType([ + StructField('countrycode', StringType(), True), + StructField('networkcode', StringType(), True), + StructField('stationid', StringType(), True), + StructField('latitude', FloatType(), True), + StructField('longitude', FloatType(), True), + StructField('element', StringType(), True), + StructField('firstyear', IntegerType(), True), + StructField('lastyear', IntegerType(), True) + ]) + invframe = spark.createDataFrame(invsplitlines, invschema) + invframe.createOrReplaceTempView("ghcndinventory") + invframe.write.mode('overwrite').parquet( + GHCNDPATH + "ghcndinventory.parquet") + invframe.cache() + print("Imported GhcndInventory") + return invframe + + +def import_ghcnd_all(scon, spark): + """ + Import all files from GHCND. + + Parameters + ---------- + scon : SparkContext + The context. + spark : SparkSession + The SQL session. + + Returns + ------- + None. + + """ + localfilepath = "file://" + GHCNDHOMEPATH + import_ghcnd_countries(scon, spark, localfilepath) + import_ghcnd_stations(scon, spark, localfilepath) + import_ghcnd_inventory(scon, spark, localfilepath) + # import_all_data(scon, spark, GHCNDHOMEPATH) + import_data_rdd_parallel(scon, spark, localfilepath) + + +def read_ghcnd_from_parquet(spark): + """ + Read all data from the parquet files into Dataframes. + + Create temporary views from the parquet files. + + Parameters + ---------- + spark : SparkSession + The SQL Session. + + Returns + ------- + None. + + """ + dfcountries = spark.read.parquet(GHCNDPATH + "ghcndcountries") + dfcountries.createOrReplaceTempView("ghcndcountries") + dfcountries.cache() + + dfstations = spark.read.parquet(GHCNDPATH + "ghcndstations") + dfstations.createOrReplaceTempView("ghcndstations") + dfstations.cache() + + dfinventory = spark.read.parquet(GHCNDPATH + "ghcndinventory") + dfinventory.createOrReplaceTempView("ghcndinventory") + dfinventory.cache() + + dfdata = spark.read.parquet(GHCNDPATH + "ghcnddata") + dfdata.createOrReplaceTempView("ghcnddata") + dfdata.cache() + + +def delete_all_parquet_ghcnd(): + """ + Delete all parquet files that were imported from GHCND. + + Returns + ------- + None. + + """ + delete_from_hdfs(GHCNDPATH + "ghcndstations.parquet") + delete_from_hdfs(GHCNDPATH + "ghcndcountries.parquet") + delete_from_hdfs(GHCNDPATH + "ghcndinventory.parquet") + delete_from_hdfs(GHCNDPATH + "ghcnddata.parquet") + + +def delete_from_hdfs(path): + """ + Delete the file in path from HDFS. + + Parameters + ---------- + path : string + Path of the file in HDFS. + + Returns + ------- + None. + + """ + call("hdfs dfs -rm -R " + path, + shell=True) \ No newline at end of file diff --git a/Aufgabe 10/sparkstart.py b/Aufgabe 10/sparkstart.py new file mode 100644 index 0000000..bdb7010 --- /dev/null +++ b/Aufgabe 10/sparkstart.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +""" +Erzeugen einer Spark-Konfiguration +""" + +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession + +# connect to cluster +conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin") +conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +conf.set("spark.executor.memory", '32g') +conf.set("spark.driver.memory", '8g') +conf.set("spark.cores.max", "40") +scon = SparkContext(conf=conf) + + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL") \ + .getOrCreate() diff --git a/Aufgabe 8/main.py b/Aufgabe 8/main.py index 360cbb7..e097d1d 100644 --- a/Aufgabe 8/main.py +++ b/Aufgabe 8/main.py @@ -133,7 +133,7 @@ def plot_avg_tmax_day(station_name): days = [row['day'] for row in df_avg] avg_tmax = [row['avg_tmax'] for row in df_avg] - + #TODO: Mit SQL machen # 21-Tage gleitender Durchschnitt (10 Tage davor, Tag selbst, 10 Tage danach) rolling_avg = [] for i in range(len(avg_tmax)): diff --git a/Aufgabe 9/Aufgabe9.py b/Aufgabe 9/Aufgabe9.py new file mode 100644 index 0000000..cd97ba7 --- /dev/null +++ b/Aufgabe 9/Aufgabe9.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 + +from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, FloatType +from pyspark.sql import Row +import pyspark.sql.functions as F +import re + +CDC_PATH = "/data/cdc/hourly/" +HDFS_HOME = "hdfs://193.174.205.250:54310/" + + +# a) Stationsdaten einlesen & als Parquet speichern +def a(scon, spark, path=CDC_PATH): + stationlines = scon.textFile(path + "TU_Stundenwerte_Beschreibung_Stationen.txt") + + stationlines = stationlines.zipWithIndex().filter(lambda x: x[1] >= 2).map(lambda x: x[0]) + + stationsplitlines = stationlines.map(lambda l: ( + l[0:5].strip(), + l[6:14].strip(), + l[15:23].strip(), + int(l[24:41].strip()), + float(l[42:52].strip()), + float(l[53:61].strip()), + l[61:101].strip(), + l[102:].strip() + )) + + stationschema = StructType([ + StructField('stationid', StringType(), True), + StructField('from_date', StringType(), True), + StructField('to_date', StringType(), True), + StructField('height', IntegerType(), True), + StructField('latitude', FloatType(), True), + StructField('longitude', FloatType(), True), + StructField('stationname', StringType(), True), + StructField('state', StringType(), True) + ]) + + stationframe = spark.createDataFrame(stationsplitlines, schema=stationschema) + + stationframe.createOrReplaceTempView("cdc_stations") + + outfile = HDFS_HOME + "/home/kramlingermike/" + "cdc_stations.parquet" + stationframe.write.mode('overwrite').parquet(outfile) + stationframe.cache() + +# a) Beispielabfrage +def get_all_cdc_stations(spark): + result = spark.sql(f""" + SELECT * + FROM cdc_stations + ORDER BY stationname + """) + result.show(truncate=False) + +# a) Beispielabfrage +def get_cdc_stations_per_state(spark): + result = spark.sql(f""" + SELECT + state, + COUNT(*) AS count + FROM cdc_stations + GROUP BY state + ORDER BY count DESC + """) + result.show(truncate=False) + +def b(scon, spark): + lines = scon.textFile(CDC_PATH + "produkt*") + + lines = lines.filter(lambda line: not line.startswith("STATIONS_ID")) + lines = lines.zipWithIndex().filter(lambda x: x[1] >= 0).map(lambda x: x[0]) + + lines = lines.map(lambda l: l.split(";")) + + lines = lines.map(lambda s: ( + s[0].strip(), + s[1].strip()[:8], + int(s[1].strip()[8:]), + int(s[2].strip()), + float(s[3].strip()), + float(s[4].strip()) + )) + + schema = StructType([ + StructField("stationid", StringType(), True), + StructField("date", StringType(), True), + StructField("hour", IntegerType(), True), + StructField("qn_9", IntegerType(), True), + StructField("tt_tu", FloatType(), True), + StructField("rf_tu", FloatType(), True) + ]) + + + df = spark.createDataFrame(lines, schema) + + df.createOrReplaceTempView("cdc_hourly") + + outfile = HDFS_HOME + "home/kramlingermike/" + "cdc_hourly.parquet" + df.write.mode("overwrite").parquet(outfile) + +def get_hourly_station(spark, stationid, limit=20): + result = spark.sql(f""" + SELECT * + FROM cdc_hourly + WHERE stationid = '{stationid}' + ORDER BY date, hour + LIMIT {limit} + """) + result.show(truncate=False) + +def avg_temp_per_day(spark, stationid, limit=20): + result = spark.sql(f""" + SELECT date, ROUND(AVG(tt_tu),2) AS avg_temp + FROM cdc_hourly + WHERE stationid = '{stationid}' + GROUP BY date + ORDER BY date + LIMIT {limit} + """) + result.show(truncate=False) + + +def main(scon, spark): + """ + main(scon, spark) + """ + + print("a)") + a(scon, spark) + print("Beispielabfrage: (Alle Stationen:)") + get_all_cdc_stations(spark) + print("Beispielabfrage: (Alle Stationen pro Bundesland)") + get_cdc_stations_per_state(spark) + print("b)") + b(scon, spark) + print("Beispielabfrage: (Alle Daten für eine Station:)") + get_hourly_station(spark, "4271") + print("Beispielabfrage: (Durchschnittliche Temperatur pro Tag für eine Station:)") + avg_temp_per_day(spark, "4271") diff --git a/Aufgabe 9/sparkstart.py b/Aufgabe 9/sparkstart.py new file mode 100644 index 0000000..4b249e9 --- /dev/null +++ b/Aufgabe 9/sparkstart.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- + +""" +Erzeugen einer Spark-Konfiguration +""" + +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession + +# connect to cluster +conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin") +conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +conf.set("spark.executor.memory", '32g') +conf.set("spark.driver.memory", '8g') +conf.set("spark.cores.max", "40") +scon = SparkContext(conf=conf) + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL") \ + .getOrCreate()