diff --git a/Aufgabe 10/Aufgabe10.py b/Aufgabe 10/Aufgabe10.py index 0aa3b52..d7763f9 100644 --- a/Aufgabe 10/Aufgabe10.py +++ b/Aufgabe 10/Aufgabe10.py @@ -5,7 +5,6 @@ import matplotlib.pyplot as plt HDFSPATH = "hdfs://193.174.205.250:54310/" - def read_parquets(spark: SparkSession): stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" diff --git a/Aufgabe 11/Aufgabe11.py b/Aufgabe 11/Aufgabe11.py index 82631cf..da4fbfd 100644 --- a/Aufgabe 11/Aufgabe11.py +++ b/Aufgabe 11/Aufgabe11.py @@ -1,366 +1,219 @@ +from __future__ import annotations from sparkstart import scon, spark from pyspark.sql import SparkSession -import time import matplotlib.pyplot as plt -import pandas as pd -HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/" -HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/" +HDFSPATH = "hdfs://193.174.205.250:54310/" -def init_view_stations(spark): - """Lädt die Stationsdaten für Aufgabe 11""" - s_path = HDFSPATH_STATIONS + "german_stations.parquet" - d_path = HDFSPATH_STATIONS + "german_stations_data.parquet" - - spark.read.parquet(s_path).createOrReplaceTempView("german_stations") - spark.read.parquet(d_path).createOrReplaceTempView("german_stations_data") +def read_parquet_tables(spark: SparkSession) -> None: + stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" + products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" -def init_view_stocks(spark): - """Lädt die Stocks-Daten für Aufgabe 12""" - # Hinweis: Pfade anpassen, falls sie im Cluster anders liegen - spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks") - spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio") + spark.read.parquet(stations_path).createOrReplaceTempView("german_stations") + spark.read.parquet(products_path).createOrReplaceTempView("german_stations_data") -# --------------------------------------------------------- -# AUFGABE 11 -# --------------------------------------------------------- +# --- Aufgabe A --- -def task_11a_rollup(spark: SparkSession, station_name="Kempten"): - print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---") - start = time.time() - - # 1. Station ID finden - sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE station_name LIKE '%{station_name}%'") - try: - sid = sid_df.collect()[0]['stationId'] - except IndexError: - print(f"Station {station_name} nicht gefunden.") - return - - # 2. Rollup Query vorbereiten - q_prep = f""" - SELECT - YEAR(date) as yr, - QUARTER(date) as qt, - MONTH(date) as mo, - DAY(date) as da, - TT_TU - FROM german_stations_data - WHERE stationId = {sid} AND TT_TU IS NOT NULL AND TT_TU > -50 +def create_mma_rollup(spark: SparkSession, station_id: int): + query = f""" + WITH processed_data AS ( + SELECT + TT_TU, + hour AS messtunde, + TO_DATE(SUBSTR(date, 1, 4), 'yyyy') AS jahr, + TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', LPAD(CAST(QUARTER(TO_DATE(date, 'yyyyMMdd'))*3-2 AS STRING), 2, '0'), '-01')) AS quartal, + TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', SUBSTR(date, 5, 2), '-01')) AS monat, + TO_DATE(date, 'yyyyMMdd') AS tag + FROM german_stations_data + WHERE stationId = {station_id} + AND TT_TU IS NOT NULL + AND TT_TU <> -999 + ) + SELECT + MIN(TT_TU) AS minTemperatur, + MAX(TT_TU) AS maxTemperatur, + AVG(TT_TU) AS avgTemperatur, + jahr, + quartal, + monat, + tag, + messtunde + FROM processed_data + GROUP BY ROLLUP (jahr, quartal, monat, tag, messtunde) + ORDER BY jahr, quartal, monat, tag, messtunde """ - spark.sql(q_prep).createOrReplaceTempView("data_prep") - - q_rollup = """ - SELECT - yr, qt, mo, da, - MIN(TT_TU) as min_temp, - MAX(TT_TU) as max_temp, - AVG(TT_TU) as avg_temp, - - -- Datums-Konstruktion für Plots - DATE(STRING(yr) || '-' || STRING(qt*3-2) || '-01') as qt_date, - MAKE_DATE(yr, mo, 1) as mo_date, - MAKE_DATE(yr, 1, 1) as yr_date, - MAKE_DATE(yr, mo, da) as da_date - - FROM data_prep - GROUP BY ROLLUP(yr, qt, mo, da) - """ - - df_rollup = spark.sql(q_rollup) - df_rollup.cache() - df_rollup.createOrReplaceTempView("station_rollup") - - # Trigger Action for Cache & Time Measurement - count = df_rollup.count() - print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start:.2f}s") - input(">> 11a: Check Spark UI (Stages/Storage) jetzt. Enter für Plots...") + df = spark.sql(query) + df.createOrReplaceTempView("mmacdcdata") + df.cache() + df.show(10) - # Plot 1: Tageswerte (letzte 3 Jahre der Daten) - q_days = """ - SELECT da_date as date, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL - AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup) - ORDER BY date - """ - pdf_days = spark.sql(q_days).toPandas() +def plot_date_values(spark: SparkSession, level: str): + filters = { + "days": "YEAR(jahr) > 2017 AND YEAR(jahr) < 2021 AND messtunde IS NULL AND tag IS NOT NULL", + "months": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NOT NULL", + "quartals": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NOT NULL", + "years": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NULL AND jahr IS NOT NULL" + } + x_col = {"days": "tag", "months": "monat", "quartals": "quartal", "years": "jahr"} - plt.figure(1, figsize=(10, 5)) - plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5) - plt.title(f"{station_name}: Daily Average (Last 3 Years)") - plt.xlabel('Date') - plt.ylabel('Temp °C') - plt.tight_layout() - plt.show() + pdf = spark.sql(f"SELECT * FROM mmacdcdata WHERE {filters[level]}").toPandas() + if pdf.empty: return - # Plot 2: Monatswerte (10-20 Jahre) - q_months = """ - SELECT mo_date as date, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) - ORDER BY date - """ - pdf_months = spark.sql(q_months).toPandas() - - plt.figure(2, figsize=(10, 5)) - plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg') - plt.title(f"{station_name}: Monthly Average (Last 20 Years)") - plt.xlabel('Date') - plt.ylabel('Temp °C') - plt.tight_layout() - plt.show() - - # Plot 3: Quartalswerte - q_quarters = """ - SELECT qt_date as date, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) - ORDER BY date - """ - pdf_quarters = spark.sql(q_quarters).toPandas() - - plt.figure(3, figsize=(10, 5)) - plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg') - plt.title(f"{station_name}: Quarterly Average (Last 20 Years)") - plt.show() - - # Plot 4: Jahreswerte - q_years = """ - SELECT yr_date as date, min_temp, max_temp, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) - ORDER BY date - """ - pdf_years = spark.sql(q_years).toPandas() - - plt.figure(4, figsize=(10, 5)) - plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max') - plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg') - plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min') - plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)") + plt.figure(figsize=(10, 5)) + plt.plot(pdf[x_col[level]], pdf["maxTemperatur"], "red", label="Max") + plt.plot(pdf[x_col[level]], pdf["avgTemperatur"], "green", label="Avg") + plt.plot(pdf[x_col[level]], pdf["minTemperatur"], "blue", label="Min") + plt.title(f"{level.capitalize()}") plt.legend() + plt.grid(True) plt.show() +# --- Aufgabe B --- -def task_11b_rank(spark: SparkSession): - print("\n--- Aufgabe 11b: TempMonat Ranking ---") - start = time.time() - - q_tempmonat = """ +def create_tempmonat(spark: SparkSession): + query = """ + WITH base_data AS ( SELECT d.stationId, - s.station_name, - SUBSTR(CAST(d.date AS STRING), 1, 4) as year, - SUBSTR(CAST(d.date AS STRING), 6, 2) as month, - MIN(d.TT_TU) as min_t, - MAX(d.TT_TU) as max_t, - AVG(d.TT_TU) as avg_t + gs.station_name AS stationsname, + d.TT_TU, + TO_DATE(SUBSTR(d.date, 1, 4), 'yyyy') AS jahr_val, + TO_DATE(CONCAT(SUBSTR(d.date, 1, 4), '-', SUBSTR(d.date, 5, 2), '-01')) AS monat_val FROM german_stations_data d - JOIN german_stations s ON d.stationId = s.stationId - WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50 - GROUP BY d.stationId, s.station_name, year, month + JOIN german_stations gs ON d.stationId = gs.stationId + WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 + ) + SELECT + stationId, + stationsname, + MIN(TT_TU) AS minTemperatur, + MAX(TT_TU) AS maxTemperatur, + AVG(TT_TU) AS avgTemperatur, + jahr_val AS jahr, + monat_val AS monat + FROM base_data + GROUP BY stationId, stationsname, jahr_val, monat_val """ - df_tm = spark.sql(q_tempmonat) - df_tm.createOrReplaceTempView("tempmonat") - - # 1. Ranking Partitioniert nach Monat im Jahr 2015 - print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...") - q_rank_2015 = """ - SELECT - month, station_name, min_t, - RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min, - RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max, - RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg - FROM tempmonat - WHERE year = '2015' - ORDER BY rank_min, month + spark.sql(query).cache().createOrReplaceTempView("tempmonat") + +def rank_temperatures(spark: SparkSession, limit: int, year: int = None): + where_clause = f"WHERE YEAR(jahr) = {year}" if year else "" + query = f""" + SELECT stationid, stationsname, monat, minTemperatur, + RANK() OVER (ORDER BY minTemperatur ASC) AS rangMIN, + maxTemperatur, + RANK() OVER (ORDER BY maxTemperatur DESC) AS rangMAX, + avgTemperatur, + RANK() OVER (ORDER BY avgTemperatur DESC) AS rangAVG + FROM tempmonat + {where_clause} + ORDER BY rangMIN """ - spark.sql(q_rank_2015).show(10) + spark.sql(query).show(limit, truncate=False) - # 2. Globales Ranking (über alle Monate/Jahre hinweg) - print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...") - q_rank_global = """ +# --- Aufgabe C --- + +def create_grouping_sets_view(spark: SparkSession): + query = """ + WITH base_gs AS ( SELECT - year, month, station_name, min_t, - RANK() OVER (ORDER BY min_t ASC) as rank_min, - RANK() OVER (ORDER BY max_t ASC) as rank_max, - RANK() OVER (ORDER BY avg_t ASC) as rank_avg - FROM tempmonat - ORDER BY rank_min - """ - spark.sql(q_rank_global).show(10) - - print(f"Dauer 11b: {time.time() - start:.2f}s") - input(">> 11b: Check Spark UI (Jobs/Stages). Enter...") - - -def task_11c_groupingsets(spark: SparkSession): - print("\n--- Aufgabe 11c: Grouping Sets ---") - start = time.time() - - q_prep = """ - SELECT - CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year, - CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month, - s.station_name, - s.bundesland, - d.TT_TU + d.stationId, + -- TRIM entfernt Leerzeichen für saubere Tabellen + TRIM(gs.bundesland) AS bundesland_clean, + d.TT_TU, + -- Extrahiere Jahr und Kalendermonat (1-12) + YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS jahr_val, + MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS monat_val FROM german_stations_data d - JOIN german_stations s ON d.stationId = s.stationId - WHERE d.TT_TU > -50 + JOIN german_stations gs ON d.stationId = gs.stationId + WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 + ) + SELECT + stationId, + bundesland_clean AS bundesland, + jahr_val AS jahr, + monat_val AS monat, + MIN(TT_TU) AS minTemperatur, + MAX(TT_TU) AS maxTemperatur, + AVG(TT_TU) AS avgTemperatur + FROM base_gs + GROUP BY GROUPING SETS ( + (bundesland_clean, jahr_val), -- 1. Jahr und Bundesland + (stationId, jahr_val), -- 2. Jahr und Station + (bundesland_clean, monat_val) -- 3. Monat und Bundesland + ) """ - spark.sql(q_prep).createOrReplaceTempView("gs_base") + df = spark.sql(query) + + df.cache() + + df.createOrReplaceTempView("tempmma_gs") - q_sets = """ + +def show_seperate_gs(spark: SparkSession, limit: int): + + # Filter: stationId muss NULL sein, monat muss NULL sein + spark.sql(""" SELECT - year, - month, + jahr, bundesland, - station_name, - MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t - FROM gs_base - GROUP BY GROUPING SETS ( - (year, bundesland), - (year, station_name), - (month, bundesland) - ) - """ - df_gs = spark.sql(q_sets) - df_gs.cache() - df_gs.createOrReplaceTempView("grouping_result") - - # Action zum Cachen - df_gs.count() - print(f"Grouping Sets berechnet. Dauer: {time.time() - start:.2f}s") + minTemperatur, + maxTemperatur, + ROUND(avgTemperatur, 2) as avgTemperatur + FROM tempmma_gs + WHERE bundesland IS NOT NULL + AND jahr IS NOT NULL + AND stationId IS NULL + AND monat IS NULL + ORDER BY jahr DESC, bundesland ASC + """).show(limit, truncate=False) - print("Auswahl 1: Jahr & Bundesland") - spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5) - - print("Auswahl 2: Jahr & Station") - spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5) - - print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)") - spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5) - - input(">> 11c: Check Spark UI (Zugriffspläne/Storage). Enter...") - - -# --------------------------------------------------------- -# AUFGABE 12 -# --------------------------------------------------------- - -def task_12_stocks_analysis(spark: SparkSession): - print("\n--- Aufgabe 12: Stocks & Portfolio ---") - - # a) Erstes und letztes Datum je Symbol - print("a) Min/Max Datum pro Symbol") - t0 = time.time() - q_a = """ - SELECT symbol, MIN(date) as first_date, MAX(date) as last_date - FROM stocks - GROUP BY symbol - ORDER BY symbol - """ - spark.sql(q_a).show(5) - print(f"Zeit a): {time.time()-t0:.2f}s") - - # b) Aggregationen 2009 - print("\nb) High/Low/Avg Close 2009") - t0 = time.time() - q_b = """ - SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close - FROM stocks - WHERE YEAR(date) = 2009 - GROUP BY symbol - ORDER BY symbol - """ - spark.sql(q_b).show(5) - print(f"Zeit b): {time.time()-t0:.2f}s") - - # c) Lateral View (Explode Portfolio) - print("\nc) Lateral View: Aktien in Portfolios") - t0 = time.time() - - q_c = """ + # Filter: bundesland muss NULL sein, monat muss NULL sein + spark.sql(""" SELECT - h.symbol, - SUM(h.amount) as total_shares, - COUNT(p.portfolioId) as num_portfolios, - AVG(h.amount) as avg_per_portfolio - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - GROUP BY h.symbol - ORDER BY h.symbol - """ - spark.sql(q_c).show(5) - print(f"Zeit c): {time.time()-t0:.2f}s") + jahr, + stationId, + minTemperatur, + maxTemperatur, + ROUND(avgTemperatur, 2) as avgTemperatur + FROM tempmma_gs + WHERE stationId IS NOT NULL + AND jahr IS NOT NULL + AND bundesland IS NULL + ORDER BY jahr DESC, stationId ASC + """).show(limit, truncate=False) - # d) Symbole in keinem Portfolio (Anti Join) - print("\nd) Symbole ohne Portfolio") - t0 = time.time() - q_d = """ - SELECT DISTINCT s.symbol - FROM stocks s - LEFT ANTI JOIN ( - SELECT DISTINCT h.symbol - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - ) p_sym ON s.symbol = p_sym.symbol - ORDER BY s.symbol - """ - spark.sql(q_d).show(5) - print(f"Zeit d): {time.time()-t0:.2f}s") - - input(">> 12 a-d fertig. Check UI. Enter für e)...") - - # e) Portfolio Wert Ende 2010 - print("\ne) Portfolio Bewertung Ende 2010") - t0 = time.time() - - q_last_price = """ - SELECT symbol, close - FROM ( - SELECT - symbol, - close, - ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn - FROM stocks - WHERE YEAR(date) = 2010 - ) tmp - WHERE rn = 1 - """ - spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end") - - # Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren - q_val = """ + # Filter: stationId muss NULL sein, jahr muss NULL sein + spark.sql(""" SELECT - p.portfolioId, - SUM(h.amount * s.close) as portfolio_value_2010 - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - JOIN stocks_2010_end s ON h.symbol = s.symbol - GROUP BY p.portfolioId - ORDER BY p.portfolioId - """ - spark.sql(q_val).show(5) - print(f"Zeit e): {time.time()-t0:.2f}s") - + monat, + bundesland, + minTemperatur, + maxTemperatur, + ROUND(avgTemperatur, 2) as avgTemperatur + FROM tempmma_gs + WHERE bundesland IS NOT NULL + AND monat IS NOT NULL + AND jahr IS NULL + ORDER BY monat ASC, bundesland ASC + """).show(limit, truncate=False) def main(scon, spark): - init_view_stations(spark) + read_parquet_tables(spark) - # Aufgabe 11 - task_11a_rollup(spark, station_name="Kempten") - task_11b_rank(spark) - task_11c_groupingsets(spark) + # Kempten ID = 2559 + create_mma_rollup(spark, 2559) + for level in ["years", "quartals", "months", "days"]: + plot_date_values(spark, level) + + create_tempmonat(spark) + print("Rangfolgen 2015:") + rank_temperatures(spark, 18, 2015) + print("Rangfolgen Gesamt:") + rank_temperatures(spark, 18) + + create_grouping_sets_view(spark) + show_seperate_gs(spark, 10) - # Aufgabe 12 - init_view_stocks(spark) - task_12_stocks_analysis(spark) - -if __name__ == '__main__': +if __name__ == "__main__": main(scon, spark) \ No newline at end of file diff --git a/Aufgabe 12/Aufgabe12.py b/Aufgabe 12/Aufgabe12.py new file mode 100644 index 0000000..40eeb34 --- /dev/null +++ b/Aufgabe 12/Aufgabe12.py @@ -0,0 +1,190 @@ +from sparkstart import spark +from sparkstart import scon, spark +from pyspark.sql import SparkSession + +HDFSPATH = "hdfs://193.174.205.250:54310/" +SOURCEPATH = HDFSPATH + "stocks/" + +def read_data(spark: SparkSession) -> None: + """ + Loads the existing Parquet files from HDFS into Spark Views. + """ + print(f"--- Loading Views from {SOURCEPATH} ---") + try: + # Load Stocks + spark.read.parquet(SOURCEPATH + "stocks.parquet").createOrReplaceTempView("stocks") + print("-> View 'stocks' loaded.") + + # Load Portfolio + spark.read.parquet(SOURCEPATH + "portfolio.parquet").createOrReplaceTempView("portfolio") + print("-> View 'portfolio' loaded.") + + except Exception as e: + print(f"CRITICAL ERROR: Could not load data. {e}") + print("Please check if the path exists in HDFS.") + +# --- Aufgabe A --- +def first_last_quotation(spark: SparkSession, num: int = 10) -> None: + print("\n--- Aufgabe A: First/Last Quotation ---") + query = """ + SELECT symbol, + MIN(dt) AS altNotierung, + MAX(dt) AS neuNotierung + FROM stocks + GROUP BY symbol + ORDER BY symbol + """ + + df_quotation = spark.sql(query) + df_quotation.show(num, truncate=False) + + df_quotation.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse1.parquet") + print("-> Imported nyse1") + + +# --- Aufgabe B --- +def min_max_avg_close(spark: SparkSession, num: int = 10) -> None: + print("\n--- Aufgabe B: Min/Max/Avg Close 2009 ---") + query = """ + SELECT symbol, + MIN(close) AS minClose, + MAX(close) AS maxClose, + AVG(close) AS avgClose + FROM stocks + WHERE YEAR(dt) = 2009 + GROUP BY symbol + ORDER BY symbol + """ + + df_close = spark.sql(query) + df_close.show(num, truncate=False) + + df_close.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse2.parquet") + print("-> Imported nyse2") + + +# --- Aufgabe C --- +def sum_count_avg_portfolios(spark: SparkSession, num: int = 10) -> None: + print("\n--- Aufgabe C: Portfolio Aggregations ---") + # 1. Explode + query_explode = """ + SELECT pid, Attr + FROM portfolio + LATERAL VIEW EXPLODE(bonds) AS Attr + """ + df_temp = spark.sql(query_explode) + df_temp.createOrReplaceTempView("temp") + + # 2. Aggregate + query_agg = """ + SELECT Attr.symbol AS symbol, + COUNT(pid) AS anzpid, + SUM(Attr.num) AS anzAktien, + AVG(Attr.num) AS avgAnzAktien + FROM temp + GROUP BY symbol + ORDER BY symbol + """ + + df_sum_sel_cnt_avg = spark.sql(query_agg) + df_sum_sel_cnt_avg.show(num, truncate=False) + + df_sum_sel_cnt_avg.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse3.parquet") + print("-> Imported nyse3") + + +# --- Aufgabe D --- +def symbols_not_in_portfolio(spark: SparkSession, num: int = 10) -> None: + print("\n--- Aufgabe D: Symbols not in Portfolio ---") + query_explode = """ + SELECT Attr + FROM portfolio + LATERAL VIEW EXPLODE(bonds) AS Attr + """ + + df_temp = spark.sql(query_explode) + df_temp.createOrReplaceTempView("tempport") + + query_distinct = """ + SELECT DISTINCT s.symbol + FROM stocks s + LEFT OUTER JOIN tempport p ON s.symbol = p.Attr.symbol + WHERE p.Attr.symbol IS NULL + ORDER BY s.symbol + """ + + df_symbols = spark.sql(query_distinct) + df_symbols.show(num, truncate=False) + + df_symbols.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse4.parquet") + print("-> Imported nyse4") + + +# --- Aufgabe E --- +def value_portfolio_2010(spark: SparkSession, num: int = 10) -> None: + print("\n--- Aufgabe E: Portfolio Value 2010 ---") + + # 1. Portfolio explodieren + query_portfolio = """ + SELECT pid, Attr.symbol AS symbol, Attr.num AS anzAktien + FROM portfolio + LATERAL VIEW EXPLODE(bonds) AS Attr + ORDER BY pid + """ + df_lview = spark.sql(query_portfolio) + df_lview.createOrReplaceTempView("tempportfolio") + # df_lview.show(num, truncate=False) # Optional zur Kontrolle + + # 2. Stocks filtern (Neuester Kurs in 2010) + query_stocks = """ + SELECT s.symbol, s.dt, s.close + FROM stocks s + INNER JOIN ( + SELECT symbol, MAX(dt) AS datum + FROM stocks + GROUP BY symbol + ) AS grpStocks + ON s.symbol = grpStocks.symbol AND s.dt = grpStocks.datum + WHERE YEAR(dt) = 2010 + ORDER BY datum + """ + df_2010 = spark.sql(query_stocks) + df_2010.createOrReplaceTempView("tempstocks") + # df_2010.show(num, truncate=False) # Optional zur Kontrolle + + # 3. Wert berechnen (Join) + query_value = """ + SELECT p.*, s.close * p.anzAktien AS wert + FROM tempportfolio p, tempstocks s + WHERE s.symbol = p.symbol + ORDER BY p.pid + """ + df_value = spark.sql(query_value) + df_value.createOrReplaceTempView("tempvalue") + # df_value.show(num, truncate=False) # Optional zur Kontrolle + + # 4. Gesamtwert aggregieren + query_sum = """ + SELECT pid, SUM(wert) AS gesamtwert + FROM tempvalue + GROUP BY pid + ORDER BY pid + """ + df_sum = spark.sql(query_sum) + df_sum.show(num, truncate=False) + + df_sum.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse5.parquet") + print("-> Imported nyse5") + + +def main(scon, spark): + read_data(spark) + + first_last_quotation(spark, 10) + min_max_avg_close(spark, 10) + sum_count_avg_portfolios(spark, 5) + symbols_not_in_portfolio(spark, 5) + value_portfolio_2010(spark, 10) + +if __name__ == "__main__": + main(scon, spark) \ No newline at end of file diff --git a/Aufgabe 12/sparkstart.py b/Aufgabe 12/sparkstart.py new file mode 100644 index 0000000..bdb7010 --- /dev/null +++ b/Aufgabe 12/sparkstart.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +""" +Erzeugen einer Spark-Konfiguration +""" + +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession + +# connect to cluster +conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin") +conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +conf.set("spark.executor.memory", '32g') +conf.set("spark.driver.memory", '8g') +conf.set("spark.cores.max", "40") +scon = SparkContext(conf=conf) + + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL") \ + .getOrCreate()