Files
BigData/Aufgabe 12/Aufgabe12.py
2025-12-19 00:09:13 +01:00

190 lines
5.5 KiB
Python

from sparkstart import spark
from sparkstart import scon, spark
from pyspark.sql import SparkSession
HDFSPATH = "hdfs://193.174.205.250:54310/"
SOURCEPATH = HDFSPATH + "stocks/"
def read_data(spark: SparkSession) -> None:
"""
Loads the existing Parquet files from HDFS into Spark Views.
"""
print(f"--- Loading Views from {SOURCEPATH} ---")
try:
# Load Stocks
spark.read.parquet(SOURCEPATH + "stocks.parquet").createOrReplaceTempView("stocks")
print("-> View 'stocks' loaded.")
# Load Portfolio
spark.read.parquet(SOURCEPATH + "portfolio.parquet").createOrReplaceTempView("portfolio")
print("-> View 'portfolio' loaded.")
except Exception as e:
print(f"CRITICAL ERROR: Could not load data. {e}")
print("Please check if the path exists in HDFS.")
# --- Aufgabe A ---
def first_last_quotation(spark: SparkSession, num: int = 10) -> None:
print("\n--- Aufgabe A: First/Last Quotation ---")
query = """
SELECT symbol,
MIN(dt) AS altNotierung,
MAX(dt) AS neuNotierung
FROM stocks
GROUP BY symbol
ORDER BY symbol
"""
df_quotation = spark.sql(query)
df_quotation.show(num, truncate=False)
df_quotation.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse1.parquet")
print("-> Imported nyse1")
# --- Aufgabe B ---
def min_max_avg_close(spark: SparkSession, num: int = 10) -> None:
print("\n--- Aufgabe B: Min/Max/Avg Close 2009 ---")
query = """
SELECT symbol,
MIN(close) AS minClose,
MAX(close) AS maxClose,
AVG(close) AS avgClose
FROM stocks
WHERE YEAR(dt) = 2009
GROUP BY symbol
ORDER BY symbol
"""
df_close = spark.sql(query)
df_close.show(num, truncate=False)
df_close.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse2.parquet")
print("-> Imported nyse2")
# --- Aufgabe C ---
def sum_count_avg_portfolios(spark: SparkSession, num: int = 10) -> None:
print("\n--- Aufgabe C: Portfolio Aggregations ---")
# 1. Explode
query_explode = """
SELECT pid, Attr
FROM portfolio
LATERAL VIEW EXPLODE(bonds) AS Attr
"""
df_temp = spark.sql(query_explode)
df_temp.createOrReplaceTempView("temp")
# 2. Aggregate
query_agg = """
SELECT Attr.symbol AS symbol,
COUNT(pid) AS anzpid,
SUM(Attr.num) AS anzAktien,
AVG(Attr.num) AS avgAnzAktien
FROM temp
GROUP BY symbol
ORDER BY symbol
"""
df_sum_sel_cnt_avg = spark.sql(query_agg)
df_sum_sel_cnt_avg.show(num, truncate=False)
df_sum_sel_cnt_avg.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse3.parquet")
print("-> Imported nyse3")
# --- Aufgabe D ---
def symbols_not_in_portfolio(spark: SparkSession, num: int = 10) -> None:
print("\n--- Aufgabe D: Symbols not in Portfolio ---")
query_explode = """
SELECT Attr
FROM portfolio
LATERAL VIEW EXPLODE(bonds) AS Attr
"""
df_temp = spark.sql(query_explode)
df_temp.createOrReplaceTempView("tempport")
query_distinct = """
SELECT DISTINCT s.symbol
FROM stocks s
LEFT OUTER JOIN tempport p ON s.symbol = p.Attr.symbol
WHERE p.Attr.symbol IS NULL
ORDER BY s.symbol
"""
df_symbols = spark.sql(query_distinct)
df_symbols.show(num, truncate=False)
df_symbols.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse4.parquet")
print("-> Imported nyse4")
# --- Aufgabe E ---
def value_portfolio_2010(spark: SparkSession, num: int = 10) -> None:
print("\n--- Aufgabe E: Portfolio Value 2010 ---")
# 1. Portfolio explodieren
query_portfolio = """
SELECT pid, Attr.symbol AS symbol, Attr.num AS anzAktien
FROM portfolio
LATERAL VIEW EXPLODE(bonds) AS Attr
ORDER BY pid
"""
df_lview = spark.sql(query_portfolio)
df_lview.createOrReplaceTempView("tempportfolio")
# df_lview.show(num, truncate=False) # Optional zur Kontrolle
# 2. Stocks filtern (Neuester Kurs in 2010)
query_stocks = """
SELECT s.symbol, s.dt, s.close
FROM stocks s
INNER JOIN (
SELECT symbol, MAX(dt) AS datum
FROM stocks
GROUP BY symbol
) AS grpStocks
ON s.symbol = grpStocks.symbol AND s.dt = grpStocks.datum
WHERE YEAR(dt) = 2010
ORDER BY datum
"""
df_2010 = spark.sql(query_stocks)
df_2010.createOrReplaceTempView("tempstocks")
# df_2010.show(num, truncate=False) # Optional zur Kontrolle
# 3. Wert berechnen (Join)
query_value = """
SELECT p.*, s.close * p.anzAktien AS wert
FROM tempportfolio p, tempstocks s
WHERE s.symbol = p.symbol
ORDER BY p.pid
"""
df_value = spark.sql(query_value)
df_value.createOrReplaceTempView("tempvalue")
# df_value.show(num, truncate=False) # Optional zur Kontrolle
# 4. Gesamtwert aggregieren
query_sum = """
SELECT pid, SUM(wert) AS gesamtwert
FROM tempvalue
GROUP BY pid
ORDER BY pid
"""
df_sum = spark.sql(query_sum)
df_sum.show(num, truncate=False)
df_sum.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse5.parquet")
print("-> Imported nyse5")
def main(scon, spark):
read_data(spark)
first_last_quotation(spark, 10)
min_max_avg_close(spark, 10)
sum_count_avg_portfolios(spark, 5)
symbols_not_in_portfolio(spark, 5)
value_portfolio_2010(spark, 10)
if __name__ == "__main__":
main(scon, spark)