From 1b2de95b2e543f8e93160054df00f6f23c66e4a7 Mon Sep 17 00:00:00 2001 From: Valentin Heiserer Date: Thu, 11 Dec 2025 20:55:44 +0100 Subject: [PATCH] 12 --- Aufgabe 10/Aufgabe10.py | 1 - Aufgabe 11/Aufgabe11.py | 258 +++++++++++++-------------------------- Aufgabe 12/Aufgabe12.py | 123 +++++++++++++++++++ Aufgabe 12/sparkstart.py | 22 ++++ 4 files changed, 229 insertions(+), 175 deletions(-) create mode 100644 Aufgabe 12/Aufgabe12.py create mode 100644 Aufgabe 12/sparkstart.py diff --git a/Aufgabe 10/Aufgabe10.py b/Aufgabe 10/Aufgabe10.py index 0aa3b52..d7763f9 100644 --- a/Aufgabe 10/Aufgabe10.py +++ b/Aufgabe 10/Aufgabe10.py @@ -5,7 +5,6 @@ import matplotlib.pyplot as plt HDFSPATH = "hdfs://193.174.205.250:54310/" - def read_parquets(spark: SparkSession): stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" diff --git a/Aufgabe 11/Aufgabe11.py b/Aufgabe 11/Aufgabe11.py index 82631cf..c560356 100644 --- a/Aufgabe 11/Aufgabe11.py +++ b/Aufgabe 11/Aufgabe11.py @@ -1,55 +1,57 @@ from sparkstart import scon, spark from pyspark.sql import SparkSession -import time import matplotlib.pyplot as plt import pandas as pd -HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/" +HDFSPATH = "hdfs://193.174.205.250:54310/" HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/" -def init_view_stations(spark): - """Lädt die Stationsdaten für Aufgabe 11""" - s_path = HDFSPATH_STATIONS + "german_stations.parquet" - d_path = HDFSPATH_STATIONS + "german_stations_data.parquet" - - spark.read.parquet(s_path).createOrReplaceTempView("german_stations") - spark.read.parquet(d_path).createOrReplaceTempView("german_stations_data") +def read_parquets(spark: SparkSession): + stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" + products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" -def init_view_stocks(spark): - """Lädt die Stocks-Daten für Aufgabe 12""" - # Hinweis: Pfade anpassen, falls sie im Cluster anders liegen - spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks") - spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio") + stations_df = spark.read.parquet(stations_path) + stations_df.createOrReplaceTempView("german_stations") -# --------------------------------------------------------- -# AUFGABE 11 -# --------------------------------------------------------- + products_df = spark.read.parquet(products_path) + products_df.createOrReplaceTempView("german_stations_data") + + stations_df.cache() + products_df.cache() def task_11a_rollup(spark: SparkSession, station_name="Kempten"): print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---") - start = time.time() + start_time = time.time() # 1. Station ID finden - sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE station_name LIKE '%{station_name}%'") + # Case-insensitive search + sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE lower(station_name) LIKE '%{station_name.lower()}%'") try: sid = sid_df.collect()[0]['stationId'] + print(f"Station found: {station_name} -> ID {sid}") except IndexError: print(f"Station {station_name} nicht gefunden.") return # 2. Rollup Query vorbereiten + # FIX: Parse string date 'YYYYMMDD' to real DATE object first q_prep = f""" SELECT - YEAR(date) as yr, - QUARTER(date) as qt, - MONTH(date) as mo, - DAY(date) as da, + YEAR(TO_DATE(date, 'yyyyMMdd')) as yr, + QUARTER(TO_DATE(date, 'yyyyMMdd')) as qt, + MONTH(TO_DATE(date, 'yyyyMMdd')) as mo, + DAY(TO_DATE(date, 'yyyyMMdd')) as da, TT_TU FROM german_stations_data - WHERE stationId = {sid} AND TT_TU IS NOT NULL AND TT_TU > -50 + WHERE stationId = {sid} + AND TT_TU IS NOT NULL + AND TT_TU > -50 + AND TT_TU < 60 """ spark.sql(q_prep).createOrReplaceTempView("data_prep") + # 3. Rollup Execution + # Note: We use string construction for quarters/months to ensure we get a valid date string for plotting q_rollup = """ SELECT yr, qt, mo, da, @@ -57,10 +59,14 @@ def task_11a_rollup(spark: SparkSession, station_name="Kempten"): MAX(TT_TU) as max_temp, AVG(TT_TU) as avg_temp, - -- Datums-Konstruktion für Plots - DATE(STRING(yr) || '-' || STRING(qt*3-2) || '-01') as qt_date, + -- Construct dates for plotting (handling the NULLs from ROLLUP) + -- For Quarter: Use 1st month of quarter + DATE(concat_ws('-', yr, cast(qt*3-2 as int), '01')) as qt_date, + -- For Month: Use 1st day of month MAKE_DATE(yr, mo, 1) as mo_date, + -- For Year: Use Jan 1st MAKE_DATE(yr, 1, 1) as yr_date, + -- For Day: Use actual date MAKE_DATE(yr, mo, da) as da_date FROM data_prep @@ -71,84 +77,97 @@ def task_11a_rollup(spark: SparkSession, station_name="Kempten"): df_rollup.cache() df_rollup.createOrReplaceTempView("station_rollup") - # Trigger Action for Cache & Time Measurement + # Trigger Action count = df_rollup.count() - print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start:.2f}s") - input(">> 11a: Check Spark UI (Stages/Storage) jetzt. Enter für Plots...") + print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start_time:.2f}s") - # Plot 1: Tageswerte (letzte 3 Jahre der Daten) + # --- PLOTTING --- + + # Plot 1: Tageswerte (letzte 3 Jahre) + # Filter: All levels must be present (not null) q_days = """ SELECT da_date as date, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL - AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup) + AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup WHERE yr IS NOT NULL) ORDER BY date """ pdf_days = spark.sql(q_days).toPandas() - plt.figure(1, figsize=(10, 5)) - plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5) - plt.title(f"{station_name}: Daily Average (Last 3 Years)") - plt.xlabel('Date') - plt.ylabel('Temp °C') - plt.tight_layout() - plt.show() + if pdf_days.empty: + print("Warnung: Keine Daten für Tages-Plot gefunden.") + else: + plt.figure(1, figsize=(10, 5)) + plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5) + plt.title(f"{station_name}: Daily Average (Last 3 Years)") + plt.xlabel('Date') + plt.ylabel('Temp °C') + plt.tight_layout() + plt.show() # Plot 2: Monatswerte (10-20 Jahre) + # Filter: Day is NULL (aggregation level), but Month is NOT NULL q_months = """ SELECT mo_date as date, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) + AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL) ORDER BY date """ pdf_months = spark.sql(q_months).toPandas() - plt.figure(2, figsize=(10, 5)) - plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg') - plt.title(f"{station_name}: Monthly Average (Last 20 Years)") - plt.xlabel('Date') - plt.ylabel('Temp °C') - plt.tight_layout() - plt.show() + if not pdf_months.empty: + plt.figure(2, figsize=(10, 5)) + plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg') + plt.title(f"{station_name}: Monthly Average (Last 20 Years)") + plt.xlabel('Date') + plt.ylabel('Temp °C') + plt.tight_layout() + plt.show() # Plot 3: Quartalswerte + # Filter: Month is NULL, Quarter is NOT NULL q_quarters = """ SELECT qt_date as date, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) + AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL) ORDER BY date """ pdf_quarters = spark.sql(q_quarters).toPandas() - plt.figure(3, figsize=(10, 5)) - plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg') - plt.title(f"{station_name}: Quarterly Average (Last 20 Years)") - plt.show() + if not pdf_quarters.empty: + plt.figure(3, figsize=(10, 5)) + plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg') + plt.title(f"{station_name}: Quarterly Average (Last 20 Years)") + plt.tight_layout() + plt.show() # Plot 4: Jahreswerte + # Filter: Quarter is NULL, Year is NOT NULL q_years = """ SELECT yr_date as date, min_temp, max_temp, avg_temp FROM station_rollup WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) + AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL) ORDER BY date """ pdf_years = spark.sql(q_years).toPandas() - plt.figure(4, figsize=(10, 5)) - plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max') - plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg') - plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min') - plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)") - plt.legend() - plt.show() + if not pdf_years.empty: + plt.figure(4, figsize=(10, 5)) + plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max') + plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg') + plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min') + plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)") + plt.legend() + plt.tight_layout() + plt.show() def task_11b_rank(spark: SparkSession): print("\n--- Aufgabe 11b: TempMonat Ranking ---") - start = time.time() + q_tempmonat = """ SELECT @@ -194,13 +213,12 @@ def task_11b_rank(spark: SparkSession): """ spark.sql(q_rank_global).show(10) - print(f"Dauer 11b: {time.time() - start:.2f}s") - input(">> 11b: Check Spark UI (Jobs/Stages). Enter...") + print("11b: Fertig.") def task_11c_groupingsets(spark: SparkSession): print("\n--- Aufgabe 11c: Grouping Sets ---") - start = time.time() + q_prep = """ SELECT @@ -234,8 +252,8 @@ def task_11c_groupingsets(spark: SparkSession): df_gs.createOrReplaceTempView("grouping_result") # Action zum Cachen - df_gs.count() - print(f"Grouping Sets berechnet. Dauer: {time.time() - start:.2f}s") + df_gs.count() + print("Grouping Sets berechnet.") print("Auswahl 1: Jahr & Bundesland") spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5) @@ -245,122 +263,14 @@ def task_11c_groupingsets(spark: SparkSession): print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)") spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5) - - input(">> 11c: Check Spark UI (Zugriffspläne/Storage). Enter...") - - -# --------------------------------------------------------- -# AUFGABE 12 -# --------------------------------------------------------- - -def task_12_stocks_analysis(spark: SparkSession): - print("\n--- Aufgabe 12: Stocks & Portfolio ---") - - # a) Erstes und letztes Datum je Symbol - print("a) Min/Max Datum pro Symbol") - t0 = time.time() - q_a = """ - SELECT symbol, MIN(date) as first_date, MAX(date) as last_date - FROM stocks - GROUP BY symbol - ORDER BY symbol - """ - spark.sql(q_a).show(5) - print(f"Zeit a): {time.time()-t0:.2f}s") - - # b) Aggregationen 2009 - print("\nb) High/Low/Avg Close 2009") - t0 = time.time() - q_b = """ - SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close - FROM stocks - WHERE YEAR(date) = 2009 - GROUP BY symbol - ORDER BY symbol - """ - spark.sql(q_b).show(5) - print(f"Zeit b): {time.time()-t0:.2f}s") - - # c) Lateral View (Explode Portfolio) - print("\nc) Lateral View: Aktien in Portfolios") - t0 = time.time() - - q_c = """ - SELECT - h.symbol, - SUM(h.amount) as total_shares, - COUNT(p.portfolioId) as num_portfolios, - AVG(h.amount) as avg_per_portfolio - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - GROUP BY h.symbol - ORDER BY h.symbol - """ - spark.sql(q_c).show(5) - print(f"Zeit c): {time.time()-t0:.2f}s") - - # d) Symbole in keinem Portfolio (Anti Join) - print("\nd) Symbole ohne Portfolio") - t0 = time.time() - q_d = """ - SELECT DISTINCT s.symbol - FROM stocks s - LEFT ANTI JOIN ( - SELECT DISTINCT h.symbol - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - ) p_sym ON s.symbol = p_sym.symbol - ORDER BY s.symbol - """ - spark.sql(q_d).show(5) - print(f"Zeit d): {time.time()-t0:.2f}s") - - input(">> 12 a-d fertig. Check UI. Enter für e)...") - - # e) Portfolio Wert Ende 2010 - print("\ne) Portfolio Bewertung Ende 2010") - t0 = time.time() - - q_last_price = """ - SELECT symbol, close - FROM ( - SELECT - symbol, - close, - ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn - FROM stocks - WHERE YEAR(date) = 2010 - ) tmp - WHERE rn = 1 - """ - spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end") - - # Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren - q_val = """ - SELECT - p.portfolioId, - SUM(h.amount * s.close) as portfolio_value_2010 - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - JOIN stocks_2010_end s ON h.symbol = s.symbol - GROUP BY p.portfolioId - ORDER BY p.portfolioId - """ - spark.sql(q_val).show(5) - print(f"Zeit e): {time.time()-t0:.2f}s") - def main(scon, spark): - init_view_stations(spark) + read_parquets(spark) # Aufgabe 11 task_11a_rollup(spark, station_name="Kempten") task_11b_rank(spark) task_11c_groupingsets(spark) - # Aufgabe 12 - init_view_stocks(spark) - task_12_stocks_analysis(spark) - if __name__ == '__main__': main(scon, spark) \ No newline at end of file diff --git a/Aufgabe 12/Aufgabe12.py b/Aufgabe 12/Aufgabe12.py new file mode 100644 index 0000000..a492ca5 --- /dev/null +++ b/Aufgabe 12/Aufgabe12.py @@ -0,0 +1,123 @@ +from sparkstart import scon, spark +from pyspark.sql import SparkSession +import time +import matplotlib.pyplot as plt +import pandas as pd + +HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/" +HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/" + +def init_view_stocks(spark): + """Lädt die Stocks-Daten für Aufgabe 12""" + # Hinweis: Pfade anpassen, falls sie im Cluster anders liegen + spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks") + spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio") + +# --------------------------------------------------------- +# AUFGABE 12 +# --------------------------------------------------------- + +def task_12_stocks_analysis(spark: SparkSession): + print("\n--- Aufgabe 12: Stocks & Portfolio ---") + + # a) Erstes und letztes Datum je Symbol + print("a) Min/Max Datum pro Symbol") + t0 = time.time() + q_a = """ + SELECT symbol, MIN(date) as first_date, MAX(date) as last_date + FROM stocks + GROUP BY symbol + ORDER BY symbol + """ + spark.sql(q_a).show(5) + print(f"Zeit a): {time.time()-t0:.2f}s") + + # b) Aggregationen 2009 + print("\nb) High/Low/Avg Close 2009") + t0 = time.time() + q_b = """ + SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close + FROM stocks + WHERE YEAR(date) = 2009 + GROUP BY symbol + ORDER BY symbol + """ + spark.sql(q_b).show(5) + print(f"Zeit b): {time.time()-t0:.2f}s") + + # c) Lateral View (Explode Portfolio) + print("\nc) Lateral View: Aktien in Portfolios") + t0 = time.time() + + q_c = """ + SELECT + h.symbol, + SUM(h.amount) as total_shares, + COUNT(p.portfolioId) as num_portfolios, + AVG(h.amount) as avg_per_portfolio + FROM portfolio p + LATERAL VIEW explode(holdings) t AS h + GROUP BY h.symbol + ORDER BY h.symbol + """ + spark.sql(q_c).show(5) + print(f"Zeit c): {time.time()-t0:.2f}s") + + # d) Symbole in keinem Portfolio (Anti Join) + print("\nd) Symbole ohne Portfolio") + t0 = time.time() + q_d = """ + SELECT DISTINCT s.symbol + FROM stocks s + LEFT ANTI JOIN ( + SELECT DISTINCT h.symbol + FROM portfolio p + LATERAL VIEW explode(holdings) t AS h + ) p_sym ON s.symbol = p_sym.symbol + ORDER BY s.symbol + """ + spark.sql(q_d).show(5) + print(f"Zeit d): {time.time()-t0:.2f}s") + + input(">> 12 a-d fertig. Check UI. Enter für e)...") + + # e) Portfolio Wert Ende 2010 + print("\ne) Portfolio Bewertung Ende 2010") + t0 = time.time() + + q_last_price = """ + SELECT symbol, close + FROM ( + SELECT + symbol, + close, + ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn + FROM stocks + WHERE YEAR(date) = 2010 + ) tmp + WHERE rn = 1 + """ + spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end") + + # Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren + q_val = """ + SELECT + p.portfolioId, + SUM(h.amount * s.close) as portfolio_value_2010 + FROM portfolio p + LATERAL VIEW explode(holdings) t AS h + JOIN stocks_2010_end s ON h.symbol = s.symbol + GROUP BY p.portfolioId + ORDER BY p.portfolioId + """ + spark.sql(q_val).show(5) + print(f"Zeit e): {time.time()-t0:.2f}s") + + +def main(scon, spark): + # Aufgabe 12 + init_view_stocks(spark) + task_12_stocks_analysis(spark) + +if __name__ == '__main__': + main(scon, spark) \ No newline at end of file diff --git a/Aufgabe 12/sparkstart.py b/Aufgabe 12/sparkstart.py new file mode 100644 index 0000000..bdb7010 --- /dev/null +++ b/Aufgabe 12/sparkstart.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +""" +Erzeugen einer Spark-Konfiguration +""" + +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession + +# connect to cluster +conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin") +conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +conf.set("spark.executor.memory", '32g') +conf.set("spark.driver.memory", '8g') +conf.set("spark.cores.max", "40") +scon = SparkContext(conf=conf) + + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL") \ + .getOrCreate()