from sparkstart import spark from sparkstart import scon, spark from pyspark.sql import SparkSession HDFSPATH = "hdfs://193.174.205.250:54310/" SOURCEPATH = HDFSPATH + "stocks/" def read_data(spark: SparkSession) -> None: """ Loads the existing Parquet files from HDFS into Spark Views. """ print(f"--- Loading Views from {SOURCEPATH} ---") try: # Load Stocks spark.read.parquet(SOURCEPATH + "stocks.parquet").createOrReplaceTempView("stocks") print("-> View 'stocks' loaded.") # Load Portfolio spark.read.parquet(SOURCEPATH + "portfolio.parquet").createOrReplaceTempView("portfolio") print("-> View 'portfolio' loaded.") except Exception as e: print(f"CRITICAL ERROR: Could not load data. {e}") print("Please check if the path exists in HDFS.") # --- Aufgabe A --- def first_last_quotation(spark: SparkSession, num: int = 10) -> None: print("\n--- Aufgabe A: First/Last Quotation ---") query = """ SELECT symbol, MIN(dt) AS altNotierung, MAX(dt) AS neuNotierung FROM stocks GROUP BY symbol ORDER BY symbol """ df_quotation = spark.sql(query) df_quotation.show(num, truncate=False) df_quotation.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse1.parquet") print("-> Imported nyse1") # --- Aufgabe B --- def min_max_avg_close(spark: SparkSession, num: int = 10) -> None: print("\n--- Aufgabe B: Min/Max/Avg Close 2009 ---") query = """ SELECT symbol, MIN(close) AS minClose, MAX(close) AS maxClose, AVG(close) AS avgClose FROM stocks WHERE YEAR(dt) = 2009 GROUP BY symbol ORDER BY symbol """ df_close = spark.sql(query) df_close.show(num, truncate=False) df_close.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse2.parquet") print("-> Imported nyse2") # --- Aufgabe C --- def sum_count_avg_portfolios(spark: SparkSession, num: int = 10) -> None: print("\n--- Aufgabe C: Portfolio Aggregations ---") # 1. Explode query_explode = """ SELECT pid, Attr FROM portfolio LATERAL VIEW EXPLODE(bonds) AS Attr """ df_temp = spark.sql(query_explode) df_temp.createOrReplaceTempView("temp") # 2. Aggregate query_agg = """ SELECT Attr.symbol AS symbol, COUNT(pid) AS anzpid, SUM(Attr.num) AS anzAktien, AVG(Attr.num) AS avgAnzAktien FROM temp GROUP BY symbol ORDER BY symbol """ df_sum_sel_cnt_avg = spark.sql(query_agg) df_sum_sel_cnt_avg.show(num, truncate=False) df_sum_sel_cnt_avg.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse3.parquet") print("-> Imported nyse3") # --- Aufgabe D --- def symbols_not_in_portfolio(spark: SparkSession, num: int = 10) -> None: print("\n--- Aufgabe D: Symbols not in Portfolio ---") query_explode = """ SELECT Attr FROM portfolio LATERAL VIEW EXPLODE(bonds) AS Attr """ df_temp = spark.sql(query_explode) df_temp.createOrReplaceTempView("tempport") query_distinct = """ SELECT DISTINCT s.symbol FROM stocks s LEFT OUTER JOIN tempport p ON s.symbol = p.Attr.symbol WHERE p.Attr.symbol IS NULL ORDER BY s.symbol """ df_symbols = spark.sql(query_distinct) df_symbols.show(num, truncate=False) df_symbols.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse4.parquet") print("-> Imported nyse4") # --- Aufgabe E --- def value_portfolio_2010(spark: SparkSession, num: int = 10) -> None: print("\n--- Aufgabe E: Portfolio Value 2010 ---") # 1. Portfolio explodieren query_portfolio = """ SELECT pid, Attr.symbol AS symbol, Attr.num AS anzAktien FROM portfolio LATERAL VIEW EXPLODE(bonds) AS Attr ORDER BY pid """ df_lview = spark.sql(query_portfolio) df_lview.createOrReplaceTempView("tempportfolio") # df_lview.show(num, truncate=False) # Optional zur Kontrolle # 2. Stocks filtern (Neuester Kurs in 2010) query_stocks = """ SELECT s.symbol, s.dt, s.close FROM stocks s INNER JOIN ( SELECT symbol, MAX(dt) AS datum FROM stocks GROUP BY symbol ) AS grpStocks ON s.symbol = grpStocks.symbol AND s.dt = grpStocks.datum WHERE YEAR(dt) = 2010 ORDER BY datum """ df_2010 = spark.sql(query_stocks) df_2010.createOrReplaceTempView("tempstocks") # df_2010.show(num, truncate=False) # Optional zur Kontrolle # 3. Wert berechnen (Join) query_value = """ SELECT p.*, s.close * p.anzAktien AS wert FROM tempportfolio p, tempstocks s WHERE s.symbol = p.symbol ORDER BY p.pid """ df_value = spark.sql(query_value) df_value.createOrReplaceTempView("tempvalue") # df_value.show(num, truncate=False) # Optional zur Kontrolle # 4. Gesamtwert aggregieren query_sum = """ SELECT pid, SUM(wert) AS gesamtwert FROM tempvalue GROUP BY pid ORDER BY pid """ df_sum = spark.sql(query_sum) df_sum.show(num, truncate=False) df_sum.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse5.parquet") print("-> Imported nyse5") def main(scon, spark): read_data(spark) first_last_quotation(spark, 10) min_max_avg_close(spark, 10) sum_count_avg_portfolios(spark, 5) symbols_not_in_portfolio(spark, 5) value_portfolio_2010(spark, 10) if __name__ == "__main__": main(scon, spark)