Compare commits

..

1 Commits

Author SHA1 Message Date
1b2de95b2e 12 2025-12-11 21:30:51 +01:00
2 changed files with 350 additions and 360 deletions

View File

@@ -1,219 +1,276 @@
from __future__ import annotations
from sparkstart import scon, spark from sparkstart import scon, spark
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import pandas as pd
HDFSPATH = "hdfs://193.174.205.250:54310/" HDFSPATH = "hdfs://193.174.205.250:54310/"
HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
def read_parquet_tables(spark: SparkSession) -> None: def read_parquets(spark: SparkSession):
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
spark.read.parquet(stations_path).createOrReplaceTempView("german_stations") stations_df = spark.read.parquet(stations_path)
spark.read.parquet(products_path).createOrReplaceTempView("german_stations_data") stations_df.createOrReplaceTempView("german_stations")
# --- Aufgabe A --- products_df = spark.read.parquet(products_path)
products_df.createOrReplaceTempView("german_stations_data")
def create_mma_rollup(spark: SparkSession, station_id: int): stations_df.cache()
query = f""" products_df.cache()
WITH processed_data AS (
def task_11a_rollup(spark: SparkSession, station_name="Kempten"):
print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---")
start_time = time.time()
# 1. Station ID finden
# Case-insensitive search
sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE lower(station_name) LIKE '%{station_name.lower()}%'")
try:
sid = sid_df.collect()[0]['stationId']
print(f"Station found: {station_name} -> ID {sid}")
except IndexError:
print(f"Station {station_name} nicht gefunden.")
return
# 2. Rollup Query vorbereiten
# FIX: Parse string date 'YYYYMMDD' to real DATE object first
q_prep = f"""
SELECT SELECT
TT_TU, YEAR(TO_DATE(date, 'yyyyMMdd')) as yr,
hour AS messtunde, QUARTER(TO_DATE(date, 'yyyyMMdd')) as qt,
TO_DATE(SUBSTR(date, 1, 4), 'yyyy') AS jahr, MONTH(TO_DATE(date, 'yyyyMMdd')) as mo,
TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', LPAD(CAST(QUARTER(TO_DATE(date, 'yyyyMMdd'))*3-2 AS STRING), 2, '0'), '-01')) AS quartal, DAY(TO_DATE(date, 'yyyyMMdd')) as da,
TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', SUBSTR(date, 5, 2), '-01')) AS monat, TT_TU
TO_DATE(date, 'yyyyMMdd') AS tag
FROM german_stations_data FROM german_stations_data
WHERE stationId = {station_id} WHERE stationId = {sid}
AND TT_TU IS NOT NULL AND TT_TU IS NOT NULL
AND TT_TU <> -999 AND TT_TU > -50
) AND TT_TU < 60
SELECT
MIN(TT_TU) AS minTemperatur,
MAX(TT_TU) AS maxTemperatur,
AVG(TT_TU) AS avgTemperatur,
jahr,
quartal,
monat,
tag,
messtunde
FROM processed_data
GROUP BY ROLLUP (jahr, quartal, monat, tag, messtunde)
ORDER BY jahr, quartal, monat, tag, messtunde
""" """
df = spark.sql(query) spark.sql(q_prep).createOrReplaceTempView("data_prep")
df.createOrReplaceTempView("mmacdcdata")
df.cache()
df.show(10)
def plot_date_values(spark: SparkSession, level: str): # 3. Rollup Execution
filters = { # Note: We use string construction for quarters/months to ensure we get a valid date string for plotting
"days": "YEAR(jahr) > 2017 AND YEAR(jahr) < 2021 AND messtunde IS NULL AND tag IS NOT NULL", q_rollup = """
"months": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NOT NULL", SELECT
"quartals": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NOT NULL", yr, qt, mo, da,
"years": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NULL AND jahr IS NOT NULL" MIN(TT_TU) as min_temp,
} MAX(TT_TU) as max_temp,
x_col = {"days": "tag", "months": "monat", "quartals": "quartal", "years": "jahr"} AVG(TT_TU) as avg_temp,
pdf = spark.sql(f"SELECT * FROM mmacdcdata WHERE {filters[level]}").toPandas() -- Construct dates for plotting (handling the NULLs from ROLLUP)
if pdf.empty: return -- For Quarter: Use 1st month of quarter
DATE(concat_ws('-', yr, cast(qt*3-2 as int), '01')) as qt_date,
-- For Month: Use 1st day of month
MAKE_DATE(yr, mo, 1) as mo_date,
-- For Year: Use Jan 1st
MAKE_DATE(yr, 1, 1) as yr_date,
-- For Day: Use actual date
MAKE_DATE(yr, mo, da) as da_date
plt.figure(figsize=(10, 5)) FROM data_prep
plt.plot(pdf[x_col[level]], pdf["maxTemperatur"], "red", label="Max") GROUP BY ROLLUP(yr, qt, mo, da)
plt.plot(pdf[x_col[level]], pdf["avgTemperatur"], "green", label="Avg") """
plt.plot(pdf[x_col[level]], pdf["minTemperatur"], "blue", label="Min")
plt.title(f"{level.capitalize()}") df_rollup = spark.sql(q_rollup)
plt.legend() df_rollup.cache()
plt.grid(True) df_rollup.createOrReplaceTempView("station_rollup")
# Trigger Action
count = df_rollup.count()
print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start_time:.2f}s")
# --- PLOTTING ---
# Plot 1: Tageswerte (letzte 3 Jahre)
# Filter: All levels must be present (not null)
q_days = """
SELECT da_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL
AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date
"""
pdf_days = spark.sql(q_days).toPandas()
if pdf_days.empty:
print("Warnung: Keine Daten für Tages-Plot gefunden.")
else:
plt.figure(1, figsize=(10, 5))
plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5)
plt.title(f"{station_name}: Daily Average (Last 3 Years)")
plt.xlabel('Date')
plt.ylabel('Temp °C')
plt.tight_layout()
plt.show() plt.show()
# --- Aufgabe B --- # Plot 2: Monatswerte (10-20 Jahre)
# Filter: Day is NULL (aggregation level), but Month is NOT NULL
q_months = """
SELECT mo_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date
"""
pdf_months = spark.sql(q_months).toPandas()
def create_tempmonat(spark: SparkSession): if not pdf_months.empty:
query = """ plt.figure(2, figsize=(10, 5))
WITH base_data AS ( plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg')
plt.title(f"{station_name}: Monthly Average (Last 20 Years)")
plt.xlabel('Date')
plt.ylabel('Temp °C')
plt.tight_layout()
plt.show()
# Plot 3: Quartalswerte
# Filter: Month is NULL, Quarter is NOT NULL
q_quarters = """
SELECT qt_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date
"""
pdf_quarters = spark.sql(q_quarters).toPandas()
if not pdf_quarters.empty:
plt.figure(3, figsize=(10, 5))
plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg')
plt.title(f"{station_name}: Quarterly Average (Last 20 Years)")
plt.tight_layout()
plt.show()
# Plot 4: Jahreswerte
# Filter: Quarter is NULL, Year is NOT NULL
q_years = """
SELECT yr_date as date, min_temp, max_temp, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date
"""
pdf_years = spark.sql(q_years).toPandas()
if not pdf_years.empty:
plt.figure(4, figsize=(10, 5))
plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max')
plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg')
plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min')
plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)")
plt.legend()
plt.tight_layout()
plt.show()
def task_11b_rank(spark: SparkSession):
print("\n--- Aufgabe 11b: TempMonat Ranking ---")
q_tempmonat = """
SELECT SELECT
d.stationId, d.stationId,
gs.station_name AS stationsname, s.station_name,
d.TT_TU, SUBSTR(CAST(d.date AS STRING), 1, 4) as year,
TO_DATE(SUBSTR(d.date, 1, 4), 'yyyy') AS jahr_val, SUBSTR(CAST(d.date AS STRING), 6, 2) as month,
TO_DATE(CONCAT(SUBSTR(d.date, 1, 4), '-', SUBSTR(d.date, 5, 2), '-01')) AS monat_val MIN(d.TT_TU) as min_t,
MAX(d.TT_TU) as max_t,
AVG(d.TT_TU) as avg_t
FROM german_stations_data d FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId JOIN german_stations s ON d.stationId = s.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50
) GROUP BY d.stationId, s.station_name, year, month
SELECT
stationId,
stationsname,
MIN(TT_TU) AS minTemperatur,
MAX(TT_TU) AS maxTemperatur,
AVG(TT_TU) AS avgTemperatur,
jahr_val AS jahr,
monat_val AS monat
FROM base_data
GROUP BY stationId, stationsname, jahr_val, monat_val
""" """
spark.sql(query).cache().createOrReplaceTempView("tempmonat") df_tm = spark.sql(q_tempmonat)
df_tm.createOrReplaceTempView("tempmonat")
def rank_temperatures(spark: SparkSession, limit: int, year: int = None): # 1. Ranking Partitioniert nach Monat im Jahr 2015
where_clause = f"WHERE YEAR(jahr) = {year}" if year else "" print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...")
query = f""" q_rank_2015 = """
SELECT stationid, stationsname, monat, minTemperatur, SELECT
RANK() OVER (ORDER BY minTemperatur ASC) AS rangMIN, month, station_name, min_t,
maxTemperatur, RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min,
RANK() OVER (ORDER BY maxTemperatur DESC) AS rangMAX, RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max,
avgTemperatur, RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg
RANK() OVER (ORDER BY avgTemperatur DESC) AS rangAVG
FROM tempmonat FROM tempmonat
{where_clause} WHERE year = '2015'
ORDER BY rangMIN ORDER BY rank_min, month
""" """
spark.sql(query).show(limit, truncate=False) spark.sql(q_rank_2015).show(10)
# --- Aufgabe C --- # 2. Globales Ranking (über alle Monate/Jahre hinweg)
print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...")
def create_grouping_sets_view(spark: SparkSession): q_rank_global = """
query = """
WITH base_gs AS (
SELECT SELECT
d.stationId, year, month, station_name, min_t,
-- TRIM entfernt Leerzeichen für saubere Tabellen RANK() OVER (ORDER BY min_t ASC) as rank_min,
TRIM(gs.bundesland) AS bundesland_clean, RANK() OVER (ORDER BY max_t ASC) as rank_max,
d.TT_TU, RANK() OVER (ORDER BY avg_t ASC) as rank_avg
-- Extrahiere Jahr und Kalendermonat (1-12) FROM tempmonat
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS jahr_val, ORDER BY rank_min
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS monat_val """
spark.sql(q_rank_global).show(10)
print("11b: Fertig.")
def task_11c_groupingsets(spark: SparkSession):
print("\n--- Aufgabe 11c: Grouping Sets ---")
q_prep = """
SELECT
CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year,
CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month,
s.station_name,
s.bundesland,
d.TT_TU
FROM german_stations_data d FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId JOIN german_stations s ON d.stationId = s.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 WHERE d.TT_TU > -50
) """
spark.sql(q_prep).createOrReplaceTempView("gs_base")
q_sets = """
SELECT SELECT
stationId, year,
bundesland_clean AS bundesland, month,
jahr_val AS jahr, bundesland,
monat_val AS monat, station_name,
MIN(TT_TU) AS minTemperatur, MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t
MAX(TT_TU) AS maxTemperatur, FROM gs_base
AVG(TT_TU) AS avgTemperatur
FROM base_gs
GROUP BY GROUPING SETS ( GROUP BY GROUPING SETS (
(bundesland_clean, jahr_val), -- 1. Jahr und Bundesland (year, bundesland),
(stationId, jahr_val), -- 2. Jahr und Station (year, station_name),
(bundesland_clean, monat_val) -- 3. Monat und Bundesland (month, bundesland)
) )
""" """
df = spark.sql(query) df_gs = spark.sql(q_sets)
df_gs.cache()
df_gs.createOrReplaceTempView("grouping_result")
df.cache() # Action zum Cachen
df_gs.count()
print("Grouping Sets berechnet.")
df.createOrReplaceTempView("tempmma_gs") print("Auswahl 1: Jahr & Bundesland")
spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5)
print("Auswahl 2: Jahr & Station")
spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5)
def show_seperate_gs(spark: SparkSession, limit: int): print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)")
spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5)
# Filter: stationId muss NULL sein, monat muss NULL sein
spark.sql("""
SELECT
jahr,
bundesland,
minTemperatur,
maxTemperatur,
ROUND(avgTemperatur, 2) as avgTemperatur
FROM tempmma_gs
WHERE bundesland IS NOT NULL
AND jahr IS NOT NULL
AND stationId IS NULL
AND monat IS NULL
ORDER BY jahr DESC, bundesland ASC
""").show(limit, truncate=False)
# Filter: bundesland muss NULL sein, monat muss NULL sein
spark.sql("""
SELECT
jahr,
stationId,
minTemperatur,
maxTemperatur,
ROUND(avgTemperatur, 2) as avgTemperatur
FROM tempmma_gs
WHERE stationId IS NOT NULL
AND jahr IS NOT NULL
AND bundesland IS NULL
ORDER BY jahr DESC, stationId ASC
""").show(limit, truncate=False)
# Filter: stationId muss NULL sein, jahr muss NULL sein
spark.sql("""
SELECT
monat,
bundesland,
minTemperatur,
maxTemperatur,
ROUND(avgTemperatur, 2) as avgTemperatur
FROM tempmma_gs
WHERE bundesland IS NOT NULL
AND monat IS NOT NULL
AND jahr IS NULL
ORDER BY monat ASC, bundesland ASC
""").show(limit, truncate=False)
def main(scon, spark): def main(scon, spark):
read_parquet_tables(spark) read_parquets(spark)
# Kempten ID = 2559 # Aufgabe 11
create_mma_rollup(spark, 2559) task_11a_rollup(spark, station_name="Kempten")
for level in ["years", "quartals", "months", "days"]: task_11b_rank(spark)
plot_date_values(spark, level) task_11c_groupingsets(spark)
create_tempmonat(spark) if __name__ == '__main__':
print("Rangfolgen 2015:")
rank_temperatures(spark, 18, 2015)
print("Rangfolgen Gesamt:")
rank_temperatures(spark, 18)
create_grouping_sets_view(spark)
show_seperate_gs(spark, 10)
if __name__ == "__main__":
main(scon, spark) main(scon, spark)

View File

@@ -1,190 +1,123 @@
from sparkstart import spark
from sparkstart import scon, spark from sparkstart import scon, spark
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import time
import matplotlib.pyplot as plt
import pandas as pd
HDFSPATH = "hdfs://193.174.205.250:54310/" HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/"
SOURCEPATH = HDFSPATH + "stocks/" HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
def read_data(spark: SparkSession) -> None: def init_view_stocks(spark):
""" """Lädt die Stocks-Daten für Aufgabe 12"""
Loads the existing Parquet files from HDFS into Spark Views. # Hinweis: Pfade anpassen, falls sie im Cluster anders liegen
""" spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks")
print(f"--- Loading Views from {SOURCEPATH} ---") spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio")
try:
# Load Stocks
spark.read.parquet(SOURCEPATH + "stocks.parquet").createOrReplaceTempView("stocks")
print("-> View 'stocks' loaded.")
# Load Portfolio # ---------------------------------------------------------
spark.read.parquet(SOURCEPATH + "portfolio.parquet").createOrReplaceTempView("portfolio") # AUFGABE 12
print("-> View 'portfolio' loaded.") # ---------------------------------------------------------
except Exception as e: def task_12_stocks_analysis(spark: SparkSession):
print(f"CRITICAL ERROR: Could not load data. {e}") print("\n--- Aufgabe 12: Stocks & Portfolio ---")
print("Please check if the path exists in HDFS.")
# --- Aufgabe A --- # a) Erstes und letztes Datum je Symbol
def first_last_quotation(spark: SparkSession, num: int = 10) -> None: print("a) Min/Max Datum pro Symbol")
print("\n--- Aufgabe A: First/Last Quotation ---") t0 = time.time()
query = """ q_a = """
SELECT symbol, SELECT symbol, MIN(date) as first_date, MAX(date) as last_date
MIN(dt) AS altNotierung,
MAX(dt) AS neuNotierung
FROM stocks FROM stocks
GROUP BY symbol GROUP BY symbol
ORDER BY symbol ORDER BY symbol
""" """
spark.sql(q_a).show(5)
print(f"Zeit a): {time.time()-t0:.2f}s")
df_quotation = spark.sql(query) # b) Aggregationen 2009
df_quotation.show(num, truncate=False) print("\nb) High/Low/Avg Close 2009")
t0 = time.time()
df_quotation.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse1.parquet") q_b = """
print("-> Imported nyse1") SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close
# --- Aufgabe B ---
def min_max_avg_close(spark: SparkSession, num: int = 10) -> None:
print("\n--- Aufgabe B: Min/Max/Avg Close 2009 ---")
query = """
SELECT symbol,
MIN(close) AS minClose,
MAX(close) AS maxClose,
AVG(close) AS avgClose
FROM stocks FROM stocks
WHERE YEAR(dt) = 2009 WHERE YEAR(date) = 2009
GROUP BY symbol GROUP BY symbol
ORDER BY symbol ORDER BY symbol
""" """
spark.sql(q_b).show(5)
print(f"Zeit b): {time.time()-t0:.2f}s")
df_close = spark.sql(query) # c) Lateral View (Explode Portfolio)
df_close.show(num, truncate=False) print("\nc) Lateral View: Aktien in Portfolios")
t0 = time.time()
df_close.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse2.parquet") q_c = """
print("-> Imported nyse2") SELECT
h.symbol,
SUM(h.amount) as total_shares,
# --- Aufgabe C --- COUNT(p.portfolioId) as num_portfolios,
def sum_count_avg_portfolios(spark: SparkSession, num: int = 10) -> None: AVG(h.amount) as avg_per_portfolio
print("\n--- Aufgabe C: Portfolio Aggregations ---") FROM portfolio p
# 1. Explode LATERAL VIEW explode(holdings) t AS h
query_explode = """ GROUP BY h.symbol
SELECT pid, Attr ORDER BY h.symbol
FROM portfolio
LATERAL VIEW EXPLODE(bonds) AS Attr
""" """
df_temp = spark.sql(query_explode) spark.sql(q_c).show(5)
df_temp.createOrReplaceTempView("temp") print(f"Zeit c): {time.time()-t0:.2f}s")
# 2. Aggregate # d) Symbole in keinem Portfolio (Anti Join)
query_agg = """ print("\nd) Symbole ohne Portfolio")
SELECT Attr.symbol AS symbol, t0 = time.time()
COUNT(pid) AS anzpid, q_d = """
SUM(Attr.num) AS anzAktien,
AVG(Attr.num) AS avgAnzAktien
FROM temp
GROUP BY symbol
ORDER BY symbol
"""
df_sum_sel_cnt_avg = spark.sql(query_agg)
df_sum_sel_cnt_avg.show(num, truncate=False)
df_sum_sel_cnt_avg.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse3.parquet")
print("-> Imported nyse3")
# --- Aufgabe D ---
def symbols_not_in_portfolio(spark: SparkSession, num: int = 10) -> None:
print("\n--- Aufgabe D: Symbols not in Portfolio ---")
query_explode = """
SELECT Attr
FROM portfolio
LATERAL VIEW EXPLODE(bonds) AS Attr
"""
df_temp = spark.sql(query_explode)
df_temp.createOrReplaceTempView("tempport")
query_distinct = """
SELECT DISTINCT s.symbol SELECT DISTINCT s.symbol
FROM stocks s FROM stocks s
LEFT OUTER JOIN tempport p ON s.symbol = p.Attr.symbol LEFT ANTI JOIN (
WHERE p.Attr.symbol IS NULL SELECT DISTINCT h.symbol
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
) p_sym ON s.symbol = p_sym.symbol
ORDER BY s.symbol ORDER BY s.symbol
""" """
spark.sql(q_d).show(5)
print(f"Zeit d): {time.time()-t0:.2f}s")
df_symbols = spark.sql(query_distinct) input(">> 12 a-d fertig. Check UI. Enter für e)...")
df_symbols.show(num, truncate=False)
df_symbols.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse4.parquet") # e) Portfolio Wert Ende 2010
print("-> Imported nyse4") print("\ne) Portfolio Bewertung Ende 2010")
t0 = time.time()
q_last_price = """
# --- Aufgabe E --- SELECT symbol, close
def value_portfolio_2010(spark: SparkSession, num: int = 10) -> None: FROM (
print("\n--- Aufgabe E: Portfolio Value 2010 ---") SELECT
symbol,
# 1. Portfolio explodieren close,
query_portfolio = """ ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn
SELECT pid, Attr.symbol AS symbol, Attr.num AS anzAktien
FROM portfolio
LATERAL VIEW EXPLODE(bonds) AS Attr
ORDER BY pid
"""
df_lview = spark.sql(query_portfolio)
df_lview.createOrReplaceTempView("tempportfolio")
# df_lview.show(num, truncate=False) # Optional zur Kontrolle
# 2. Stocks filtern (Neuester Kurs in 2010)
query_stocks = """
SELECT s.symbol, s.dt, s.close
FROM stocks s
INNER JOIN (
SELECT symbol, MAX(dt) AS datum
FROM stocks FROM stocks
GROUP BY symbol WHERE YEAR(date) = 2010
) AS grpStocks ) tmp
ON s.symbol = grpStocks.symbol AND s.dt = grpStocks.datum WHERE rn = 1
WHERE YEAR(dt) = 2010
ORDER BY datum
""" """
df_2010 = spark.sql(query_stocks) spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end")
df_2010.createOrReplaceTempView("tempstocks")
# df_2010.show(num, truncate=False) # Optional zur Kontrolle
# 3. Wert berechnen (Join) # Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren
query_value = """ q_val = """
SELECT p.*, s.close * p.anzAktien AS wert SELECT
FROM tempportfolio p, tempstocks s p.portfolioId,
WHERE s.symbol = p.symbol SUM(h.amount * s.close) as portfolio_value_2010
ORDER BY p.pid FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
JOIN stocks_2010_end s ON h.symbol = s.symbol
GROUP BY p.portfolioId
ORDER BY p.portfolioId
""" """
df_value = spark.sql(query_value) spark.sql(q_val).show(5)
df_value.createOrReplaceTempView("tempvalue") print(f"Zeit e): {time.time()-t0:.2f}s")
# df_value.show(num, truncate=False) # Optional zur Kontrolle
# 4. Gesamtwert aggregieren
query_sum = """
SELECT pid, SUM(wert) AS gesamtwert
FROM tempvalue
GROUP BY pid
ORDER BY pid
"""
df_sum = spark.sql(query_sum)
df_sum.show(num, truncate=False)
df_sum.write.mode('overwrite').parquet(HDFSPATH + "home/heiserervalentin/nyse5.parquet")
print("-> Imported nyse5")
def main(scon, spark): def main(scon, spark):
read_data(spark) # Aufgabe 12
init_view_stocks(spark)
task_12_stocks_analysis(spark)
first_last_quotation(spark, 10) if __name__ == '__main__':
min_max_avg_close(spark, 10)
sum_count_avg_portfolios(spark, 5)
symbols_not_in_portfolio(spark, 5)
value_portfolio_2010(spark, 10)
if __name__ == "__main__":
main(scon, spark) main(scon, spark)