diff --git a/Aufgabe 10/Aufgabe10.py b/Aufgabe 10/Aufgabe10.py index d28c5fc..6e1d636 100644 --- a/Aufgabe 10/Aufgabe10.py +++ b/Aufgabe 10/Aufgabe10.py @@ -1,219 +1,186 @@ from sparkstart import scon, spark -import ghcnd_stations -import matplotlib.pyplot as plt +from pyspark.sql import SparkSession import time +import matplotlib.pyplot as plt -# a) Scatterplot: alle Stationen (lon/lat) -def plot_all_stations(spark): - q = """ - SELECT stationname, latitude, longitude - FROM ghcndstations - WHERE latitude IS NOT NULL AND longitude IS NOT NULL - """ - t0 = time.time() - rows = spark.sql(q).collect() - t1 = time.time() - print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}") - - lats = [r['latitude'] for r in rows] - lons = [r['longitude'] for r in rows] - names = [r['stationname'] for r in rows] - - plt.figure(figsize=(8,6)) - plt.scatter(lons, lats, s=10, alpha=0.6) - plt.xlabel('Longitude') - plt.ylabel('Latitude') - plt.title('Alle GHCND-Stationen (Scatter)') - plt.grid(True) - plt.show() +HDFSPATH = "hdfs://193.174.205.250:54310/" -# b) Scatterplot: Stationsdauer in Jahren als Marker-Size (aus ghcndinventory: firstyear/lastyear) -def plot_station_duration(spark, size_factor=20): - q = """ - SELECT - s.stationname, - s.latitude, - s.longitude, - (COALESCE(i.lastyear, year(current_date())) - COALESCE(i.firstyear, year(current_date()))) AS years - FROM ghcndstations s - LEFT JOIN ghcndinventory i ON s.stationid = i.stationid - WHERE s.latitude IS NOT NULL AND s.longitude IS NOT NULL - """ - t0 = time.time() - rows = spark.sql(q).collect() - t1 = time.time() - print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}") +def read_parquets(spark: SparkSession): + stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" + products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" - lats = [r['latitude'] for r in rows] - lons = [r['longitude'] for r in rows] - years = [r['years'] if r['years'] is not None else 0 for r in rows] - sizes = [max(5, (y+1) * size_factor) for y in years] + stations_df = spark.read.parquet(stations_path) + stations_df.createOrReplaceTempView("german_stations") - plt.figure(figsize=(8,6)) - plt.scatter(lons, lats, s=sizes, alpha=0.6) - plt.xlabel('Longitude') - plt.ylabel('Latitude') - plt.title('GHCND-Stationen: Dauer der Verfuegbarkeit (Größe ~ Jahre)') - plt.grid(True) - plt.show() + products_df = spark.read.parquet(products_path) + products_df.createOrReplaceTempView("german_stations_data") + + stations_df.cache() + products_df.cache() -def plot_frost_distribution_year(spark, year): - q = f""" - WITH daily_max AS ( - SELECT stationid, date, MAX(CAST(value AS DOUBLE))/10.0 AS max_temp - FROM ghcnddata - WHERE element = 'TMAX' - AND length(date) >= 4 - AND substr(date,1,4) = '{year}' - GROUP BY stationid, date - ), - station_frost AS ( - SELECT dm.stationid, SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays - FROM daily_max dm - GROUP BY dm.stationid - ) - SELECT sf.frostdays, COUNT(*) AS stations - FROM station_frost sf - GROUP BY sf.frostdays - ORDER BY sf.frostdays - """ - t0 = time.time() - rows = spark.sql(q).collect() - t1 = time.time() - print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Distinct frostdays: {len(rows)}") +def plot_all_stations(spark: SparkSession): + q = "SELECT geo_laenge AS lon, geo_breite AS lat FROM german_stations WHERE geo_laenge IS NOT NULL AND geo_breite IS NOT NULL" + df = spark.sql(q) - if not rows: - print(f"Keine Daten f\u00fcr Jahr {year}.") - return - - x = [r['frostdays'] for r in rows] - y = [r['stations'] for r in rows] - - plt.figure(figsize=(8,5)) - plt.bar(x, y) - plt.xlabel('Anzahl Frosttage im Jahr ' + str(year)) - plt.ylabel('Anzahl Stationen') - plt.title(f'Verteilung der Frosttage pro Station im Jahr {year}') - plt.grid(True) - plt.show() + pdf = df.toPandas() + plt.figure(figsize=(8, 6)) + plt.scatter(pdf.lon, pdf.lat, s=6, color='red', marker='.') + plt.xlabel('Longitude') + plt.ylabel('Latitude') + plt.title('All Stations (locations)') + plt.tight_layout() + plt.show() -# c2) Frosttage Zeitreihe für eine Station mit 5- und 20-Jahres Durchschnitt (SQL window) -def plot_station_frost_timeseries(spark, station_name): - q = f""" - WITH daily_max AS ( - SELECT stationid, date, MAX(CAST(value AS DOUBLE))/10.0 AS max_temp - FROM ghcnddata - WHERE element = 'TMAX' - GROUP BY stationid, date - ), - yearly AS ( - SELECT - dm.stationid, - CAST(substr(dm.date,1,4) AS INT) AS year, - SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays - FROM daily_max dm - GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT) - ), - station_yearly AS ( - SELECT - y.year, - y.frostdays, - AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg5, - AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg20 - FROM yearly y - JOIN ghcndstations s ON y.stationid = s.stationid - WHERE trim(upper(s.stationname)) = '{station_name.upper()}' - ORDER BY y.year - ) - SELECT * FROM station_yearly - """ - t0 = time.time() - rows = spark.sql(q).collect() - t1 = time.time() - print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years: {len(rows)}") +def duration_circle_size(spark: SparkSession): + q = ( + "SELECT stationId, geo_laenge AS lon, geo_breite AS lat, " + "(CAST(SUBSTR(bis_datum,1,4) AS INT) - CAST(SUBSTR(von_datum,1,4) AS INT)) AS duration_years " + "FROM german_stations " + "WHERE TRIM(von_datum)<>'' AND TRIM(bis_datum)<>''" + ) + df = spark.sql(q) - if not rows: - print(f"Keine Daten f\u00fcr Station '{station_name}'.") - return + pdf = df.toPandas() - years = [r['year'] for r in rows] - frostdays = [r['frostdays'] for r in rows] - avg5 = [r['avg5'] for r in rows] - avg20 = [r['avg20'] for r in rows] + pdf['duration_years'] = pdf['duration_years'].fillna(0).astype(int) + sizes = (pdf['duration_years'].clip(lower=0) + 1) * 6 - plt.figure(figsize=(10,5)) - plt.plot(years, frostdays, label='Frosttage (Jahr)') - plt.plot(years, avg5, label='5-Jahres-Durchschnitt') - plt.plot(years, avg20, label='20-Jahres-Durchschnitt') - plt.xlabel('Jahr') - plt.ylabel('Anzahl Frosttage') - plt.title(f'Frosttage f\u00fcr Station {station_name}') - plt.legend() - plt.grid(True) - plt.show() + plt.figure(figsize=(8, 6)) + plt.scatter(pdf.lon, pdf.lat, s=sizes, alpha=0.6, c=pdf['duration_years'], cmap='viridis') + plt.colorbar(label='Duration (years)') + plt.xlabel('Longitude') + plt.ylabel('Latitude') + plt.title('Stations with duration (years) as marker size') + plt.tight_layout() + plt.show() -# d) Korrelation Hoehe (elevation) vs. Frosttage pro Jahr -def plot_height_frost_correlation(spark): - q = """ - WITH daily_max AS ( - SELECT stationid, date, MAX(CAST(value AS DOUBLE))/10.0 AS max_temp - FROM ghcnddata - WHERE element = 'TMAX' - GROUP BY stationid, date - ), - yearly AS ( - SELECT - dm.stationid, - CAST(substr(dm.date,1,4) AS INT) AS year, - SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays - FROM daily_max dm - GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT) - ), - joined AS ( - SELECT y.year, s.elevation, y.frostdays - FROM yearly y - JOIN ghcndstations s ON y.stationid = s.stationid - WHERE s.elevation IS NOT NULL - ), - yearly_corr AS ( - SELECT year, corr(elevation, frostdays) AS corr - FROM joined - GROUP BY year - ORDER BY year - ) - SELECT year, corr FROM yearly_corr WHERE corr IS NOT NULL - """ - t0 = time.time() - rows = spark.sql(q).collect() - t1 = time.time() - print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years with corr: {len(rows)}") +def compute_daily_and_yearly_frosts(spark: SparkSession): + q_daily_max = ( + "SELECT stationId, date, SUBSTR(date,1,4) AS year, MAX(TT_TU) AS max_temp " + "FROM german_stations_data " + "GROUP BY stationId, date" + ) + daily_max = spark.sql(q_daily_max) + daily_max.createOrReplaceTempView('daily_max') - if not rows: - print("Keine Korrelationsdaten verfügbar.") - return + # mark a day as frost if max_temp < 0 + q_daily_frost = ( + "SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost " + "FROM daily_max" + ) + daily_frost = spark.sql(q_daily_frost) + daily_frost.createOrReplaceTempView('daily_frost') - years = [r['year'] for r in rows] - corr = [r['corr'] for r in rows] + # yearly frostdays per station + q_station_year = ( + "SELECT stationId, year, SUM(is_frost) AS frost_days " + "FROM daily_frost GROUP BY stationId, year" + ) + station_year_frost = spark.sql(q_station_year) + station_year_frost.createOrReplaceTempView('station_year_frost') - plt.figure(figsize=(10,5)) - plt.bar(years, corr) - plt.xlabel('Jahr') - plt.ylabel('Korrelationskoeffizient (elevation vs frostdays)') - plt.title('Korrelation Hoehe (elevation) vs. Frosttage pro Jahr') - plt.grid(True) - plt.show() + +def frost_analysis(spark: SparkSession, year=2024, station_name_matches=('kempten',)): + compute_daily_and_yearly_frosts(spark) + + q_hist = ( + f"SELECT frost_days, COUNT(*) AS station_count " + f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days" + ) + hist_df = spark.sql(q_hist) + + hist_pdf = hist_df.toPandas() + plt.figure(figsize=(8, 5)) + plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue') + plt.xlabel('Number of Frost Days in year ' + str(year)) + plt.ylabel('Number of Stations') + plt.title(f'Stations vs Frost Days ({year})') + plt.tight_layout() + plt.show() + + for name in station_name_matches: + q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'" + ids_df = spark.sql(q_find) + ids = ids_df.collect() + if not ids: + print(f"No stations found matching '{name}'") + continue + for r in ids: + sid = r['stationId'] + sname = r['station_name'] + print(f"Analyzing stationId={sid} name={sname}") + + # compute frostdays + 5-yr and 20-yr rolling averages using window frame + q_ts = ( + "SELECT year, frost_days, " + "AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, " + "AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 " + f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)" + ) + ts_df = spark.sql(q_ts) + + pdf = ts_df.toPandas() + if pdf.empty: + print(f"No yearly frost data for station {sid}") + continue + + pdf['year'] = pdf['year'].astype(int) + plt.figure(figsize=(10, 5)) + plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o') + plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--') + plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':') + plt.xlabel('Year') + plt.ylabel('Frost Days') + plt.title(f'Frost Days over Years for {sname} (station {sid})') + plt.legend() + plt.tight_layout() + plt.show() + + +def height_frost_correlation(spark: SparkSession): + compute_daily_and_yearly_frosts(spark) + + q_corr = ( + "SELECT syf.year AS year, corr(s.hoehe, syf.frost_days) AS height_frost_corr " + "FROM station_year_frost syf JOIN german_stations s ON syf.stationId = s.stationId " + "GROUP BY syf.year ORDER BY CAST(syf.year AS INT)" + ) + + corr_df = spark.sql(q_corr) + + corr_pdf = corr_df.toPandas() + + corr_pdf = corr_pdf.dropna(subset=['height_frost_corr']) + if corr_pdf.empty: + print("No non-NaN correlation values found.") + return + + corr_pdf['year'] = corr_pdf['year'].astype(int) + plt.figure(figsize=(10, 5)) + plt.bar(corr_pdf.year, corr_pdf.height_frost_corr, color='orange') + plt.xlabel('Year') + plt.ylabel('Correlation (height vs frostdays)') + plt.title('Yearly correlation: station height vs number of frost days') + plt.tight_layout() + plt.show() + + +def main(scon, spark): + read_parquets(spark) + + plot_all_stations(spark) + + duration_circle_size(spark) + + frost_analysis(spark, year=2024, station_name_matches=('kempten',)) + + height_frost_correlation(spark) if __name__ == '__main__': - ghcnd_stations.read_ghcnd_from_parquet(spark) + main(scon, spark) - plot_all_stations(spark) - plot_station_duration(spark) - plot_frost_distribution_year(spark, '2010') - plot_station_frost_timeseries(spark, 'KEMPTEN') - plot_height_frost_correlation(spark) - pass diff --git a/Aufgabe 10/ghcnd_stations.py b/Aufgabe 10/ghcnd_stations.py deleted file mode 100644 index 06eb7bd..0000000 --- a/Aufgabe 10/ghcnd_stations.py +++ /dev/null @@ -1,689 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Load stations, countries, inventory and data from GHCND as Dataset. - -@author: steger - -""" - -# pylint: disable=pointless-string-statement - -import os -from datetime import date -from time import time -from subprocess import call -from pyspark.sql.types import StructType -from pyspark.sql.types import StructField -from pyspark.sql.types import StringType -from pyspark.sql.types import FloatType -from pyspark.sql.types import IntegerType -from pyspark.sql.types import DateType - - -# ============================================= -# run sparkstart.py before to create a session -# ============================================= - -HDFSPATH = "hdfs://193.174.205.250:54310/" -GHCNDPATH = HDFSPATH + "ghcnd/" -GHCNDHOMEPATH = "/data/ghcnd/" - - -def conv_elevation(elev): - """ - Convert an elevation value. - - -999.9 means there is no value. - - Parameters - ---------- - elev : string - The elevation to convert to float. - - Returns - ------- - res : numeric - The converted value as float. - """ - elev = elev.strip() - if elev == "-999.9": - res = None - else: - res = float(elev) - return res - - -def conv_data_value(line, start): - """ - Convert a single data value from a dly.- File. - - Parameters - ---------- - line : string - The line with the data value. - start : int - The index at which the value starts. - - Returns - ------- - res : numeric - The onverted data value as int. - """ - return int(line[start:start+5].strip()) - - -def import_ghcnd_stations(scon, spark, path): - """ - Read the station data into a dataframe. - - Register it as temporary view and write it to parquet. - - Parameters - ---------- - scon : SparkContext - The spark context. - spark : SparkSession - The SQL session. - - Returns - ------- - stationFrame : DataFrame - The spark Data Frame with the stations data. - """ - stationlines = scon.textFile(path + "ghcnd-stations.txt") - stationsplitlines = stationlines.map( - lambda l: - (l[0:2], - l[2:3], - l[0:11], - float(l[12:20].strip()), - float(l[21:30].strip()), - conv_elevation(l[31:37]), - l[41:71] - )) - stationschema = StructType([ - StructField('countrycode', StringType(), True), - StructField('networkcode', StringType(), True), - StructField('stationid', StringType(), True), - StructField('latitude', FloatType(), True), - StructField('longitude', FloatType(), True), - StructField('elevation', FloatType(), True), - StructField('stationname', StringType(), True) - ]) - stationframe = spark.createDataFrame(stationsplitlines, - schema=stationschema) - stationframe.createOrReplaceTempView("ghcndstations") - stationframe.write.mode('overwrite').parquet( - GHCNDPATH + "ghcndstations.parquet") - stationframe.cache() - print("Imported GhcndStations") - return stationframe - - -def import_ghcnd_countries(scon, spark, path): - """ - Read the countries data into a dataframe. - - Register it as temptable and write it to parquet. - - Parameters - ---------- - scon : SparkContext - The spark context. - spark : SparkSession - The SQL session. - path : string - The path where the file with data resides. - - Returns - ------- - stationFrame : DataFrame - The spark Data Frame with the countries data. - """ - countrylines = scon.textFile(path + "ghcnd-countries.txt") - countrysplitlines = countrylines.map(lambda l: (l[0:2], l[2:50])) - countryschema = StructType([ - StructField('countrycode', StringType(), True), - StructField('countryname', StringType(), True)]) - countryframe = spark.createDataFrame(countrysplitlines, countryschema) - countryframe.createOrReplaceTempView("ghcndcountries") - countryframe.write.mode('overwrite').parquet( - GHCNDPATH + "ghcndcountries.parquet") - countryframe.cache() - print("Imported GhcndCountries") - return countryframe - - -def conv_data_line(line): - """ - Convert a data line from GHCND-Datafile (.dly). - - Parameters - ---------- - line : string - String with a data line containing the values for one month. - - Returns - ------- - list of tuple - List containing a tuple for each data value. - """ - if line == '': - return [] - - countrycode = line[0:2] - networkcode = line[2:3] - stationid = line[0:11] - year = int(line[11:15]) - month = int(line[15:17]) - element = line[17:21] - datlst = [] - for i in range(0, 30): - val = conv_data_value(line, 21 + i*8) - if val != -9999: - datlst.append((countrycode, networkcode, stationid, - year, month, i+1, - date(year, month, i+1), - element, - val)) - return datlst - - -def read_dly_file(scon, spark, filename): - """ - Read a .dly-file into a data frame. - - Parameters - ---------- - scon : SparkContext - The spark context. - spark : SparkSession - The SQL session. - filename : string - The name and path of the dly-File. - - Returns - ------- - RDD - The RDD with the contents of the dly-File. - """ - dly = scon.textFile(filename) - return process_dly_file_lines(spark, dly) - - -def process_dly_file_lines(spark, lines): - """ - Process the lines of one dly file. - - Parameters - ---------- - spark : SparkSession - The SQL session. - lines : RDD - RDD with one value per line. - - Returns - ------- - dlyFrame : DataFram - Data Frame containing the data of the file. - - """ - dlsplit = lines.flatMap(conv_data_line) - dlyfileschema = StructType([ - StructField('countrycode', StringType(), True), - StructField('networkcode', StringType(), True), - StructField('stationid', StringType(), True), - StructField('year', IntegerType(), True), - StructField('month', IntegerType(), True), - StructField('day', IntegerType(), True), - StructField('date', DateType(), True), - StructField('element', StringType(), True), - StructField('value', IntegerType(), True) - ]) - dlyframe = spark.createDataFrame(dlsplit, dlyfileschema) - return dlyframe - - -def import_data_rdd_parallel(scon, spark, path): - """ - Import the data files from ghcnd in parallel. - - This is much faster on a cluster or a computer with many cores - and enough main memory to hold all the raw data. - - Parameters - ---------- - scon : SparkContext - The context. - spark : SparkSession - The SQL session. - - Returns - ------- - None. - """ - rdd = scon.textFile( - path+"/ghcnd_all/*.dly", minPartitions=5000) - rddcoa = rdd.coalesce(5000) - - rddsplit = rddcoa.flatMap(conv_data_line) - print("Number of data records = " + str(rddsplit.count())) - print("Number of partitions = " + str(rddsplit.getNumPartitions())) - - dlyfileschema = StructType([ - StructField('countrycode', StringType(), True), - StructField('networkcode', StringType(), True), - StructField('stationid', StringType(), True), - StructField('year', IntegerType(), True), - StructField('month', IntegerType(), True), - StructField('day', IntegerType(), True), - StructField('date', DateType(), True), - StructField('element', StringType(), True), - StructField('value', IntegerType(), True) - ]) - dlyframe = spark.createDataFrame(rddsplit, dlyfileschema) - - dlyframe.show(10) - - dlyframe.write.mode('overwrite').parquet( - GHCNDPATH + "ghcnddata.parquet") - print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet")) - - -def import_data_rdd_parallel_whole(scon, spark, path): - """ - Import the data files from ghcnd in parallel. - - This is much faster on a cluster or a computer with many cores - and enough main memory to hold all the raw data. - - Parameters - ---------- - scon : SparkContext - The context. - spark : SparkSession - The SQL session. - - Returns - ------- - None. - """ - rdd = scon.wholeTextFiles( - path+"/ghcnd_all/*.dly", minPartitions=5000 ) - - rddvals = rdd.values() - print("Number of files in GHCND = " + str(rddvals.count())) - rddlen = rddvals.map(len) - print("Number of characters in all files = " + - str(rddlen.reduce(lambda x, y: x + y))) - - rddlines = rddvals.flatMap(lambda x: x.split("\n")) - print("Number of lines with data = " + str(rddlines.count())) - - rddsplit = rddlines.flatMap(conv_data_line) - print("Number of data records = " + str(rddsplit.count())) - print("Number of partitions = " + str(rddsplit.getNumPartitions())) - - dlyfileschema = StructType([ - StructField('countrycode', StringType(), True), - StructField('networkcode', StringType(), True), - StructField('stationid', StringType(), True), - StructField('year', IntegerType(), True), - StructField('month', IntegerType(), True), - StructField('day', IntegerType(), True), - StructField('date', DateType(), True), - StructField('element', StringType(), True), - StructField('value', IntegerType(), True) - ]) - dlyframe = spark.createDataFrame(rddsplit, dlyfileschema) - - dlyframe.show(10) - - dlyframe.write.mode('overwrite').parquet( - GHCNDPATH + "ghcnddata.parquet") - print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet")) - - """ - Code for testing problems that resulted finally from empty lines - to solve the problem the code - if line == '': - return [] - was added at the beginning of convDataLine to filter away empty lines: - - noyear = rddsplit.filter(lambda x: not x[3].isnumeric()) - noyear.collect() - - rddlines1 = rdd.flatMap(lambda x: [(x[0], y) for y in x[1].split("\n")]) - print(rddlines1.count()) - - rddsplit1 = rddlines1.flatMap(convDataLine1) - print(rddsplit1.count()) - - noyear1 = rddsplit1.filter(lambda x: not x[1][3].isnumeric()) - noyear1.collect() - """ - - -def import_ghcnd_files_extern(scon, spark, path, stationlist, batchsize, - numparts): - """ - Import multiple data files in one batch. - - Import batchsize data files in one batch and append the data into - the parquet file. - - Parameters - ---------- - scon : SparkContext - The context. - spark : SparkSession - The SQL session. - path : string - Path of the data files. - stationlist : list - List of all stations to load. - batchsize : int - Number of files to load in one batch. - numparts : int - Number of partitions to write one batch. - - Returns - ------- - None. - - """ - data = None - count = 0 - allcount = 0 - batchcount = 0 - for station in stationlist: - # filename = "file://" + path + "/" + station + ".dly" - filename = path + station + ".dly" - if os.path.isfile(filename): - dly = read_dly_file(spark, scon, "file://" + filename) - if data is not None: - data = data.union(dly) - print("Batch " + str(batchcount) + - " Filenr " + str(count) + " Processing " + filename) - else: - tstart = time() - data = dly - count += 1 - if count >= batchsize: - # data = data.sort('countrycode', 'stationid', 'date') - data = data.coalesce(numparts) - tcoalesce = time() - data.write.mode('Append').parquet( - GHCNDPATH + "ghcnddata.parquet") - anzrec = data.count() - twrite = time() - print( - "\n\nBatch " + str(batchcount) + - " #recs " + str(anzrec) + - " #files " + str(allcount) + - " readtime " + str.format("{:f}", tcoalesce - tstart) + - " writetime " + str.format("{:f}", twrite - tcoalesce) + - " recs/sec " + - str.format("{:f}", anzrec / (twrite - tstart)) + "\n\n") - allcount += count - count = 0 - batchcount += 1 - data = None - else: - print("importGhcndFilesExtern: " + station + - ", " + filename + " not found") - if data is not None: - data = data.coalesce(numparts) - data.write.mode('Append').parquet(GHCNDPATH + "ghcnddata.parquet") - - -def import_all_data(scon, spark, path): - """ - Import all data from GHCND. - - Parameters - ---------- - scon : SparkContext - The context. - spark : SparkSession - The SQL session. - path : string - Path of data files. - - Returns - ------- - None. - - """ - stationlist = spark.sql( - "SELECT stationid AS station \ - FROM ghcndstations \ - ORDER BY station") - pds = stationlist.toPandas() - import_ghcnd_files_extern(scon, spark, path + "ghcnd_all/", - pds.station, 30, 1) - - -def import_data_single_files(scon, spark, stationlist, parquetname, path): - """ - Import the data files one by one. - - Parameters - ---------- - scon : SparkContext - The context. - spark : SparkSession - The SQL session. - stationlist : list - List of all stations to import data. - parquetname : string - Name of the parquet file to write the data to. - path : string - Path where the data files reside. - - Returns - ------- - None. - - """ - pds = stationlist.toPandas() - cnt = 0 - for station in pds.station: - filename = path + station + ".dly" - if os.path.isfile(filename): - start = time() - dly = read_dly_file(spark, scon, "file://" + filename) - numrec = dly.count() - dly = dly.coalesce(1).sort('element', 'date') - read = time() - dly.write.mode('Append').parquet(GHCNDPATH - + parquetname + ".parquet") - finish = time() - print(str.format( - "{:8d} ", cnt) + station + - " #rec " + str.format("{:7d}", numrec) + - " read " + str.format("{:f}", read - start) + - " write " + str.format("{:f}", finish - read) + - " write/sec " + str.format("importDataSingleFiles{:f} ", - numrec/(finish - read)) - + " " + filename) - else: - print("#### " + str(cnt) + " File " + - filename + " does not exist ####") - cnt += 1 - - -def check_files(spark): - """ - Check if some files for generated stationnames do not exist. - - Parameters - ---------- - spark : SparkSession - The SQL session. - - Returns - ------- - None. - - """ - stationlist = spark.sql( - "SELECT CONCAT(countrycode, networkcode, stationid) AS station \ - FROM ghcndstations \ - ORDER BY station") - pds = stationlist.toPandas() - count = 1 - for station in pds.station: - filename = "/nfs/home/steger/ghcnd/ghcnd_all/" + station + ".dly" - if os.path.isfile(filename): - # print(str(count) + " " + station) - pass - else: - print(str(count) + " File does not exist: " + filename) - count += 1 - - """ - Read the inventory data into a dataframe, - register it as temporary view and write it to parquet - """ - - -def import_ghcnd_inventory(scon, spark, path): - """ - Import inventory information from GHCND. - - Parameters - ---------- - scon : SparkContext - The context. - spark : SparkSession - The SQL session. - path : string - Path for inventory file. - - Returns - ------- - invframe : DataFrame - Data Frame with inventory data. - - """ - invlines = scon.textFile(path + "ghcnd-inventory.txt") - invsplitlines = invlines.map( - lambda l: - (l[0:2], - l[2:3], - l[0:11], - float(l[12:20].strip()), - float(l[21:30].strip()), - l[31:35], - int(l[36:40]), - int(l[41:45]) - )) - invschema = StructType([ - StructField('countrycode', StringType(), True), - StructField('networkcode', StringType(), True), - StructField('stationid', StringType(), True), - StructField('latitude', FloatType(), True), - StructField('longitude', FloatType(), True), - StructField('element', StringType(), True), - StructField('firstyear', IntegerType(), True), - StructField('lastyear', IntegerType(), True) - ]) - invframe = spark.createDataFrame(invsplitlines, invschema) - invframe.createOrReplaceTempView("ghcndinventory") - invframe.write.mode('overwrite').parquet( - GHCNDPATH + "ghcndinventory.parquet") - invframe.cache() - print("Imported GhcndInventory") - return invframe - - -def import_ghcnd_all(scon, spark): - """ - Import all files from GHCND. - - Parameters - ---------- - scon : SparkContext - The context. - spark : SparkSession - The SQL session. - - Returns - ------- - None. - - """ - localfilepath = "file://" + GHCNDHOMEPATH - import_ghcnd_countries(scon, spark, localfilepath) - import_ghcnd_stations(scon, spark, localfilepath) - import_ghcnd_inventory(scon, spark, localfilepath) - # import_all_data(scon, spark, GHCNDHOMEPATH) - import_data_rdd_parallel(scon, spark, localfilepath) - - -def read_ghcnd_from_parquet(spark): - """ - Read all data from the parquet files into Dataframes. - - Create temporary views from the parquet files. - - Parameters - ---------- - spark : SparkSession - The SQL Session. - - Returns - ------- - None. - - """ - dfcountries = spark.read.parquet(GHCNDPATH + "ghcndcountries") - dfcountries.createOrReplaceTempView("ghcndcountries") - dfcountries.cache() - - dfstations = spark.read.parquet(GHCNDPATH + "ghcndstations") - dfstations.createOrReplaceTempView("ghcndstations") - dfstations.cache() - - dfinventory = spark.read.parquet(GHCNDPATH + "ghcndinventory") - dfinventory.createOrReplaceTempView("ghcndinventory") - dfinventory.cache() - - dfdata = spark.read.parquet(GHCNDPATH + "ghcnddata") - dfdata.createOrReplaceTempView("ghcnddata") - dfdata.cache() - - -def delete_all_parquet_ghcnd(): - """ - Delete all parquet files that were imported from GHCND. - - Returns - ------- - None. - - """ - delete_from_hdfs(GHCNDPATH + "ghcndstations.parquet") - delete_from_hdfs(GHCNDPATH + "ghcndcountries.parquet") - delete_from_hdfs(GHCNDPATH + "ghcndinventory.parquet") - delete_from_hdfs(GHCNDPATH + "ghcnddata.parquet") - - -def delete_from_hdfs(path): - """ - Delete the file in path from HDFS. - - Parameters - ---------- - path : string - Path of the file in HDFS. - - Returns - ------- - None. - - """ - call("hdfs dfs -rm -R " + path, - shell=True) \ No newline at end of file diff --git a/Aufgabe 9/Aufgabe9.py b/Aufgabe 9/Aufgabe9.py index 946f845..8b65214 100644 --- a/Aufgabe 9/Aufgabe9.py +++ b/Aufgabe 9/Aufgabe9.py @@ -1,144 +1,167 @@ from sparkstart import scon, spark +from pyspark import SparkContext, rdd +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType +from pyspark.sql.types import StructField +from pyspark.sql.types import StringType +from pyspark.sql.types import FloatType +from pyspark.sql.types import IntegerType -from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, FloatType -from pyspark.sql import Row -import pyspark.sql.functions as F -import re +import matplotlib.pyplot as plt -CDC_PATH = "/data/cdc/hourly/" -HDFS_HOME = "hdfs://193.174.205.250:54310/" +HDFSPATH = "hdfs://193.174.205.250:54310/" +GHCNDPATH = HDFSPATH + "ghcnd/" +GHCNDHOMEPATH = "/data/ghcnd/" -# a) Stationsdaten einlesen & als Parquet speichern -def a(scon, spark, path=CDC_PATH): - stationlines = scon.textFile(path + "TU_Stundenwerte_Beschreibung_Stationen.txt") +# Aufgabe 9 a - stationlines = stationlines.zipWithIndex().filter(lambda x: x[1] >= 2).map(lambda x: x[0]) - - stationsplitlines = stationlines.map(lambda l: ( - l[0:5].strip(), - l[6:14].strip(), - l[15:23].strip(), - int(l[24:41].strip()), - float(l[42:52].strip()), - float(l[53:61].strip()), - l[61:101].strip(), - l[102:].strip() - )) +def import_data(spark: SparkSession, scon: SparkContext): + """ + %time import_data(spark, scon) + """ - stationschema = StructType([ - StructField('stationid', StringType(), True), - StructField('from_date', StringType(), True), - StructField('to_date', StringType(), True), - StructField('height', IntegerType(), True), - StructField('latitude', FloatType(), True), - StructField('longitude', FloatType(), True), - StructField('stationname', StringType(), True), - StructField('state', StringType(), True) - ]) - - stationframe = spark.createDataFrame(stationsplitlines, schema=stationschema) - - stationframe.createOrReplaceTempView("cdc_stations") - - outfile = HDFS_HOME + "/home/kramlingermike/" + "cdc_stations.parquet" - stationframe.write.mode('overwrite').parquet(outfile) - stationframe.cache() - -# a) Beispielabfrage -def get_all_cdc_stations(spark): - result = spark.sql(f""" - SELECT * - FROM cdc_stations - ORDER BY stationname - """) - result.show(truncate=False) - -# a) Beispielabfrage -def get_cdc_stations_per_state(spark): - result = spark.sql(f""" - SELECT - state, - COUNT(*) AS count - FROM cdc_stations - GROUP BY state - ORDER BY count DESC - """) - result.show(truncate=False) - -def b(scon, spark): - lines = scon.textFile(CDC_PATH + "produkt*") + # Daten in RDD einlesen + rdd_station = scon.textFile("/data/cdc/hourly/TU_Stundenwerte_Beschreibung_Stationen.txt") - lines = lines.filter(lambda line: not line.startswith("STATIONS_ID")) - lines = lines.zipWithIndex().filter(lambda x: x[1] >= 0).map(lambda x: x[0]) - - lines = lines.map(lambda l: l.split(";")) - - lines = lines.map(lambda s: ( - s[0].strip(), - s[1].strip()[:8], - int(s[1].strip()[8:]), - int(s[2].strip()), - float(s[3].strip()), - float(s[4].strip()) - )) - - schema = StructType([ - StructField("stationid", StringType(), True), - StructField("date", StringType(), True), - StructField("hour", IntegerType(), True), - StructField("qn_9", IntegerType(), True), - StructField("tt_tu", FloatType(), True), - StructField("rf_tu", FloatType(), True) - ]) - - - df = spark.createDataFrame(lines, schema) - - df.createOrReplaceTempView("cdc_hourly") - - outfile = HDFS_HOME + "home/kramlingermike/" + "cdc_hourly.parquet" - df.write.mode("overwrite").parquet(outfile) - -def get_hourly_station(spark, stationid, limit=20): - result = spark.sql(f""" - SELECT * - FROM cdc_hourly - WHERE stationid = '{stationid}' - ORDER BY date, hour - LIMIT {limit} - """) - result.show(truncate=False) - -def avg_temp_per_day(spark, stationid, limit=20): - result = spark.sql(f""" - SELECT date, ROUND(AVG(tt_tu),2) AS avg_temp - FROM cdc_hourly - WHERE stationid = '{stationid}' - GROUP BY date - ORDER BY date - LIMIT {limit} - """) - result.show(truncate=False) + # Entfernen der ersten beiden Zeilen (Header und Trennzeile) + rdd_station_filterd = (rdd_station + .zipWithIndex() # jede Zeile bekommt idx + .filter(lambda x: x[1] >= 2) # nur Zeilen mit idx >= 2 behalten + .map(lambda x: x[0])) # idx wieder entfernen + rdd_station_splitlines = rdd_station_filterd.map( + lambda l: ( + int(l[:6].strip()), # Station ID + l[6:15], # von_datum + l[15:24], # bis_datum + float(l[24:40].strip()), # stations höhe + float(l[40:53].strip()), # geoBreite + float(l[53:61].strip()), # geoHöhe + l[61:142], # Stationsname + l[142:-1] # Bundesland + )) + + # Datenschema festlegen + stationschema = StructType( + [ + StructField("stationId", IntegerType(), True), + StructField("von_datum", StringType(), True), + StructField("bis_datum", StringType(), True), + StructField("hoehe", FloatType(), True), + StructField("geo_breite", FloatType(), True), + StructField("geo_laenge", FloatType(), True), + StructField("station_name", StringType(), True), + StructField("bundesland", StringType(), True) + ] + ) + + # Data Frame erzeugen + stationframe = spark.createDataFrame(rdd_station_splitlines, schema=stationschema) + stationframe.printSchema() + + # Temporäre View erzeugen + stationframe.createOrReplaceTempView("german_stations") + + # Data Frame in HDFS speichern + stationframe.write.mode("overwrite").parquet( + HDFSPATH + "home/heiserervalentin/german_stations.parquet" + ) + + + +def read_data_from_parquet(spark): + """ + read_data_from_parquet(spark) + """ + df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations.parquet") + df.createOrReplaceTempView("german_stations") + df.cache() + +def sql_querys(spark): + """ + sql_querys(spark) + """ + spark.sql("SELECT * FROM german_stations").show(5, truncate=False) + spark.sql("SELECT COUNT(*) AS Anzahl FROM german_stations").show() + spark.sql("SELECT MAX(geo_breite) FROM german_stations").show() + df = spark.sql("SELECT * FROM german_stations").toPandas() + + plt.figure(figsize=[6,6]) + plt.scatter(df.geo_laenge, df.geo_breite, marker='.', color = 'r') + + plt.show() + + +def import_produkt_files(spark: SparkSession, scon: SparkContext, path='/data/cdc/hourly/'): + """ + import_produkt_files(spark, scon) + """ + + # Daten in RDD einlesen + rdd_produkt = scon.textFile(f"{path}/produkt*") + + # Kopfzeile und Leerzeichen filtern + rdd_filterd = rdd_produkt \ + .filter(lambda l: l != 'STATIONS_ID;MESS_DATUM;QN_9;TT_TU;RF_TU;eor') \ + .map(lambda l: [x.strip() for x in l.split(';')]) + + # Zeilen in Felder aufteilen + rdd_produkt_splitlines = rdd_filterd.map( + lambda l: ( + int(l[0]), # Stat_id + l[1][:8], # Messdatum + int(l[1][8:10]), # Messstunde + int(l[2]), # Qualitätsniveau + float(l[3]), # Lufttemp. + float(l[4]), # rel. Luftfeuchte + int(l[1][0:4]) # jahr + ) + ) + + print(rdd_produkt_splitlines.take(5)) + + # Datenschema definieren + product_schema = StructType( + [ + StructField("stationId", IntegerType(), True), + StructField("date", StringType(), True), + StructField("hour", IntegerType(), True), + StructField("QN_9", IntegerType(), True), + StructField("TT_TU", FloatType(), True), + StructField("RF_TU", FloatType(), True), + StructField("jahr", IntegerType(), True) + ] + ) + + product_frame = spark.createDataFrame(rdd_produkt_splitlines, schema=product_schema) + product_frame.printSchema() + product_frame.createOrReplaceTempView("german_stations_data") + + + product_frame.write.mode("overwrite").parquet( + HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" + ) + + + +def read_product_data_from_parquet(spark): + """ + read_product_data_from_parquet(spark) + """ + df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations_data.parquet") + df.createOrReplaceTempView("german_stations_data") + df.cache() + def main(scon, spark): - """ - main(scon, spark) - """ + # Daten importieren + import_data(spark, scon) + read_data_from_parquet(spark) + sql_querys(spark) - print("a)") - a(scon, spark) - print("Beispielabfrage: (Alle Stationen:)") - get_all_cdc_stations(spark) - print("Beispielabfrage: (Alle Stationen pro Bundesland)") - get_cdc_stations_per_state(spark) - print("b)") - b(scon, spark) - print("Beispielabfrage: (Alle Daten für eine Station:)") - get_hourly_station(spark, "4271") - print("Beispielabfrage: (Durchschnittliche Temperatur pro Tag für eine Station:)") - avg_temp_per_day(spark, "4271") + import_produkt_files(spark, scon) + read_product_data_from_parquet(spark) if __name__ == "__main__": - main(scon, spark) + main(scon, spark) \ No newline at end of file