from __future__ import annotations from sparkstart import scon, spark from pyspark.sql import SparkSession import matplotlib.pyplot as plt HDFSPATH = "hdfs://193.174.205.250:54310/" def read_parquet_tables(spark: SparkSession) -> None: stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" spark.read.parquet(stations_path).createOrReplaceTempView("german_stations") spark.read.parquet(products_path).createOrReplaceTempView("german_stations_data") # --- Aufgabe A --- def create_mma_rollup(spark: SparkSession, station_id: int): query = f""" WITH processed_data AS ( SELECT TT_TU, hour AS messtunde, TO_DATE(SUBSTR(date, 1, 4), 'yyyy') AS jahr, TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', LPAD(CAST(QUARTER(TO_DATE(date, 'yyyyMMdd'))*3-2 AS STRING), 2, '0'), '-01')) AS quartal, TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', SUBSTR(date, 5, 2), '-01')) AS monat, TO_DATE(date, 'yyyyMMdd') AS tag FROM german_stations_data WHERE stationId = {station_id} AND TT_TU IS NOT NULL AND TT_TU <> -999 ) SELECT MIN(TT_TU) AS minTemperatur, MAX(TT_TU) AS maxTemperatur, AVG(TT_TU) AS avgTemperatur, jahr, quartal, monat, tag, messtunde FROM processed_data GROUP BY ROLLUP (jahr, quartal, monat, tag, messtunde) ORDER BY jahr, quartal, monat, tag, messtunde """ df = spark.sql(query) df.createOrReplaceTempView("mmacdcdata") df.cache() df.show(10) def plot_date_values(spark: SparkSession, level: str): filters = { "days": "YEAR(jahr) > 2017 AND YEAR(jahr) < 2021 AND messtunde IS NULL AND tag IS NOT NULL", "months": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NOT NULL", "quartals": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NOT NULL", "years": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NULL AND jahr IS NOT NULL" } x_col = {"days": "tag", "months": "monat", "quartals": "quartal", "years": "jahr"} pdf = spark.sql(f"SELECT * FROM mmacdcdata WHERE {filters[level]}").toPandas() if pdf.empty: return plt.figure(figsize=(10, 5)) plt.plot(pdf[x_col[level]], pdf["maxTemperatur"], "red", label="Max") plt.plot(pdf[x_col[level]], pdf["avgTemperatur"], "green", label="Avg") plt.plot(pdf[x_col[level]], pdf["minTemperatur"], "blue", label="Min") plt.title(f"{level.capitalize()}") plt.legend() plt.grid(True) plt.show() # --- Aufgabe B --- def create_tempmonat(spark: SparkSession): query = """ WITH base_data AS ( SELECT d.stationId, gs.station_name AS stationsname, d.TT_TU, TO_DATE(SUBSTR(d.date, 1, 4), 'yyyy') AS jahr_val, TO_DATE(CONCAT(SUBSTR(d.date, 1, 4), '-', SUBSTR(d.date, 5, 2), '-01')) AS monat_val FROM german_stations_data d JOIN german_stations gs ON d.stationId = gs.stationId WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 ) SELECT stationId, stationsname, MIN(TT_TU) AS minTemperatur, MAX(TT_TU) AS maxTemperatur, AVG(TT_TU) AS avgTemperatur, jahr_val AS jahr, monat_val AS monat FROM base_data GROUP BY stationId, stationsname, jahr_val, monat_val """ spark.sql(query).cache().createOrReplaceTempView("tempmonat") def rank_temperatures(spark: SparkSession, limit: int, year: int = None): where_clause = f"WHERE YEAR(jahr) = {year}" if year else "" query = f""" SELECT stationid, stationsname, monat, minTemperatur, RANK() OVER (ORDER BY minTemperatur ASC) AS rangMIN, maxTemperatur, RANK() OVER (ORDER BY maxTemperatur DESC) AS rangMAX, avgTemperatur, RANK() OVER (ORDER BY avgTemperatur DESC) AS rangAVG FROM tempmonat {where_clause} ORDER BY rangMIN """ spark.sql(query).show(limit, truncate=False) # --- Aufgabe C --- def create_grouping_sets_view(spark: SparkSession): query = """ WITH base_gs AS ( SELECT d.stationId, -- TRIM entfernt Leerzeichen für saubere Tabellen TRIM(gs.bundesland) AS bundesland_clean, d.TT_TU, -- Extrahiere Jahr und Kalendermonat (1-12) YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS jahr_val, MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS monat_val FROM german_stations_data d JOIN german_stations gs ON d.stationId = gs.stationId WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 ) SELECT stationId, bundesland_clean AS bundesland, jahr_val AS jahr, monat_val AS monat, MIN(TT_TU) AS minTemperatur, MAX(TT_TU) AS maxTemperatur, AVG(TT_TU) AS avgTemperatur FROM base_gs GROUP BY GROUPING SETS ( (bundesland_clean, jahr_val), -- 1. Jahr und Bundesland (stationId, jahr_val), -- 2. Jahr und Station (bundesland_clean, monat_val) -- 3. Monat und Bundesland ) """ df = spark.sql(query) df.cache() df.createOrReplaceTempView("tempmma_gs") def show_seperate_gs(spark: SparkSession, limit: int): # Filter: stationId muss NULL sein, monat muss NULL sein spark.sql(""" SELECT jahr, bundesland, minTemperatur, maxTemperatur, ROUND(avgTemperatur, 2) as avgTemperatur FROM tempmma_gs WHERE bundesland IS NOT NULL AND jahr IS NOT NULL AND stationId IS NULL AND monat IS NULL ORDER BY jahr DESC, bundesland ASC """).show(limit, truncate=False) # Filter: bundesland muss NULL sein, monat muss NULL sein spark.sql(""" SELECT jahr, stationId, minTemperatur, maxTemperatur, ROUND(avgTemperatur, 2) as avgTemperatur FROM tempmma_gs WHERE stationId IS NOT NULL AND jahr IS NOT NULL AND bundesland IS NULL ORDER BY jahr DESC, stationId ASC """).show(limit, truncate=False) # Filter: stationId muss NULL sein, jahr muss NULL sein spark.sql(""" SELECT monat, bundesland, minTemperatur, maxTemperatur, ROUND(avgTemperatur, 2) as avgTemperatur FROM tempmma_gs WHERE bundesland IS NOT NULL AND monat IS NOT NULL AND jahr IS NULL ORDER BY monat ASC, bundesland ASC """).show(limit, truncate=False) def main(scon, spark): read_parquet_tables(spark) # Kempten ID = 2559 create_mma_rollup(spark, 2559) for level in ["years", "quartals", "months", "days"]: plot_date_values(spark, level) create_tempmonat(spark) print("Rangfolgen 2015:") rank_temperatures(spark, 18, 2015) print("Rangfolgen Gesamt:") rank_temperatures(spark, 18) create_grouping_sets_view(spark) show_seperate_gs(spark, 10) if __name__ == "__main__": main(scon, spark)