This commit is contained in:
2025-12-11 20:55:44 +01:00
parent d18e9823e5
commit a0ac5dbf36
4 changed files with 394 additions and 330 deletions

View File

@@ -1,366 +1,219 @@
from __future__ import annotations
from sparkstart import scon, spark
from pyspark.sql import SparkSession
import time
import matplotlib.pyplot as plt
import pandas as pd
HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/"
HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
HDFSPATH = "hdfs://193.174.205.250:54310/"
def init_view_stations(spark):
"""Lädt die Stationsdaten für Aufgabe 11"""
s_path = HDFSPATH_STATIONS + "german_stations.parquet"
d_path = HDFSPATH_STATIONS + "german_stations_data.parquet"
spark.read.parquet(s_path).createOrReplaceTempView("german_stations")
spark.read.parquet(d_path).createOrReplaceTempView("german_stations_data")
def read_parquet_tables(spark: SparkSession) -> None:
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
def init_view_stocks(spark):
"""Lädt die Stocks-Daten für Aufgabe 12"""
# Hinweis: Pfade anpassen, falls sie im Cluster anders liegen
spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks")
spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio")
spark.read.parquet(stations_path).createOrReplaceTempView("german_stations")
spark.read.parquet(products_path).createOrReplaceTempView("german_stations_data")
# ---------------------------------------------------------
# AUFGABE 11
# ---------------------------------------------------------
# --- Aufgabe A ---
def task_11a_rollup(spark: SparkSession, station_name="Kempten"):
print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---")
start = time.time()
# 1. Station ID finden
sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE station_name LIKE '%{station_name}%'")
try:
sid = sid_df.collect()[0]['stationId']
except IndexError:
print(f"Station {station_name} nicht gefunden.")
return
# 2. Rollup Query vorbereiten
q_prep = f"""
SELECT
YEAR(date) as yr,
QUARTER(date) as qt,
MONTH(date) as mo,
DAY(date) as da,
TT_TU
FROM german_stations_data
WHERE stationId = {sid} AND TT_TU IS NOT NULL AND TT_TU > -50
def create_mma_rollup(spark: SparkSession, station_id: int):
query = f"""
WITH processed_data AS (
SELECT
TT_TU,
hour AS messtunde,
TO_DATE(SUBSTR(date, 1, 4), 'yyyy') AS jahr,
TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', LPAD(CAST(QUARTER(TO_DATE(date, 'yyyyMMdd'))*3-2 AS STRING), 2, '0'), '-01')) AS quartal,
TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', SUBSTR(date, 5, 2), '-01')) AS monat,
TO_DATE(date, 'yyyyMMdd') AS tag
FROM german_stations_data
WHERE stationId = {station_id}
AND TT_TU IS NOT NULL
AND TT_TU <> -999
)
SELECT
MIN(TT_TU) AS minTemperatur,
MAX(TT_TU) AS maxTemperatur,
AVG(TT_TU) AS avgTemperatur,
jahr,
quartal,
monat,
tag,
messtunde
FROM processed_data
GROUP BY ROLLUP (jahr, quartal, monat, tag, messtunde)
ORDER BY jahr, quartal, monat, tag, messtunde
"""
spark.sql(q_prep).createOrReplaceTempView("data_prep")
q_rollup = """
SELECT
yr, qt, mo, da,
MIN(TT_TU) as min_temp,
MAX(TT_TU) as max_temp,
AVG(TT_TU) as avg_temp,
-- Datums-Konstruktion für Plots
DATE(STRING(yr) || '-' || STRING(qt*3-2) || '-01') as qt_date,
MAKE_DATE(yr, mo, 1) as mo_date,
MAKE_DATE(yr, 1, 1) as yr_date,
MAKE_DATE(yr, mo, da) as da_date
FROM data_prep
GROUP BY ROLLUP(yr, qt, mo, da)
"""
df_rollup = spark.sql(q_rollup)
df_rollup.cache()
df_rollup.createOrReplaceTempView("station_rollup")
# Trigger Action for Cache & Time Measurement
count = df_rollup.count()
print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start:.2f}s")
input(">> 11a: Check Spark UI (Stages/Storage) jetzt. Enter für Plots...")
df = spark.sql(query)
df.createOrReplaceTempView("mmacdcdata")
df.cache()
df.show(10)
# Plot 1: Tageswerte (letzte 3 Jahre der Daten)
q_days = """
SELECT da_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL
AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup)
ORDER BY date
"""
pdf_days = spark.sql(q_days).toPandas()
def plot_date_values(spark: SparkSession, level: str):
filters = {
"days": "YEAR(jahr) > 2017 AND YEAR(jahr) < 2021 AND messtunde IS NULL AND tag IS NOT NULL",
"months": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NOT NULL",
"quartals": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NOT NULL",
"years": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NULL AND jahr IS NOT NULL"
}
x_col = {"days": "tag", "months": "monat", "quartals": "quartal", "years": "jahr"}
plt.figure(1, figsize=(10, 5))
plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5)
plt.title(f"{station_name}: Daily Average (Last 3 Years)")
plt.xlabel('Date')
plt.ylabel('Temp °C')
plt.tight_layout()
plt.show()
pdf = spark.sql(f"SELECT * FROM mmacdcdata WHERE {filters[level]}").toPandas()
if pdf.empty: return
# Plot 2: Monatswerte (10-20 Jahre)
q_months = """
SELECT mo_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup)
ORDER BY date
"""
pdf_months = spark.sql(q_months).toPandas()
plt.figure(2, figsize=(10, 5))
plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg')
plt.title(f"{station_name}: Monthly Average (Last 20 Years)")
plt.xlabel('Date')
plt.ylabel('Temp °C')
plt.tight_layout()
plt.show()
# Plot 3: Quartalswerte
q_quarters = """
SELECT qt_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup)
ORDER BY date
"""
pdf_quarters = spark.sql(q_quarters).toPandas()
plt.figure(3, figsize=(10, 5))
plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg')
plt.title(f"{station_name}: Quarterly Average (Last 20 Years)")
plt.show()
# Plot 4: Jahreswerte
q_years = """
SELECT yr_date as date, min_temp, max_temp, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup)
ORDER BY date
"""
pdf_years = spark.sql(q_years).toPandas()
plt.figure(4, figsize=(10, 5))
plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max')
plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg')
plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min')
plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)")
plt.figure(figsize=(10, 5))
plt.plot(pdf[x_col[level]], pdf["maxTemperatur"], "red", label="Max")
plt.plot(pdf[x_col[level]], pdf["avgTemperatur"], "green", label="Avg")
plt.plot(pdf[x_col[level]], pdf["minTemperatur"], "blue", label="Min")
plt.title(f"{level.capitalize()}")
plt.legend()
plt.grid(True)
plt.show()
# --- Aufgabe B ---
def task_11b_rank(spark: SparkSession):
print("\n--- Aufgabe 11b: TempMonat Ranking ---")
start = time.time()
q_tempmonat = """
def create_tempmonat(spark: SparkSession):
query = """
WITH base_data AS (
SELECT
d.stationId,
s.station_name,
SUBSTR(CAST(d.date AS STRING), 1, 4) as year,
SUBSTR(CAST(d.date AS STRING), 6, 2) as month,
MIN(d.TT_TU) as min_t,
MAX(d.TT_TU) as max_t,
AVG(d.TT_TU) as avg_t
gs.station_name AS stationsname,
d.TT_TU,
TO_DATE(SUBSTR(d.date, 1, 4), 'yyyy') AS jahr_val,
TO_DATE(CONCAT(SUBSTR(d.date, 1, 4), '-', SUBSTR(d.date, 5, 2), '-01')) AS monat_val
FROM german_stations_data d
JOIN german_stations s ON d.stationId = s.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50
GROUP BY d.stationId, s.station_name, year, month
JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
)
SELECT
stationId,
stationsname,
MIN(TT_TU) AS minTemperatur,
MAX(TT_TU) AS maxTemperatur,
AVG(TT_TU) AS avgTemperatur,
jahr_val AS jahr,
monat_val AS monat
FROM base_data
GROUP BY stationId, stationsname, jahr_val, monat_val
"""
df_tm = spark.sql(q_tempmonat)
df_tm.createOrReplaceTempView("tempmonat")
# 1. Ranking Partitioniert nach Monat im Jahr 2015
print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...")
q_rank_2015 = """
SELECT
month, station_name, min_t,
RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min,
RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max,
RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg
FROM tempmonat
WHERE year = '2015'
ORDER BY rank_min, month
spark.sql(query).cache().createOrReplaceTempView("tempmonat")
def rank_temperatures(spark: SparkSession, limit: int, year: int = None):
where_clause = f"WHERE YEAR(jahr) = {year}" if year else ""
query = f"""
SELECT stationid, stationsname, monat, minTemperatur,
RANK() OVER (ORDER BY minTemperatur ASC) AS rangMIN,
maxTemperatur,
RANK() OVER (ORDER BY maxTemperatur DESC) AS rangMAX,
avgTemperatur,
RANK() OVER (ORDER BY avgTemperatur DESC) AS rangAVG
FROM tempmonat
{where_clause}
ORDER BY rangMIN
"""
spark.sql(q_rank_2015).show(10)
spark.sql(query).show(limit, truncate=False)
# 2. Globales Ranking (über alle Monate/Jahre hinweg)
print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...")
q_rank_global = """
# --- Aufgabe C ---
def create_grouping_sets_view(spark: SparkSession):
query = """
WITH base_gs AS (
SELECT
year, month, station_name, min_t,
RANK() OVER (ORDER BY min_t ASC) as rank_min,
RANK() OVER (ORDER BY max_t ASC) as rank_max,
RANK() OVER (ORDER BY avg_t ASC) as rank_avg
FROM tempmonat
ORDER BY rank_min
"""
spark.sql(q_rank_global).show(10)
print(f"Dauer 11b: {time.time() - start:.2f}s")
input(">> 11b: Check Spark UI (Jobs/Stages). Enter...")
def task_11c_groupingsets(spark: SparkSession):
print("\n--- Aufgabe 11c: Grouping Sets ---")
start = time.time()
q_prep = """
SELECT
CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year,
CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month,
s.station_name,
s.bundesland,
d.TT_TU
d.stationId,
-- TRIM entfernt Leerzeichen für saubere Tabellen
TRIM(gs.bundesland) AS bundesland_clean,
d.TT_TU,
-- Extrahiere Jahr und Kalendermonat (1-12)
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS jahr_val,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS monat_val
FROM german_stations_data d
JOIN german_stations s ON d.stationId = s.stationId
WHERE d.TT_TU > -50
JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
)
SELECT
stationId,
bundesland_clean AS bundesland,
jahr_val AS jahr,
monat_val AS monat,
MIN(TT_TU) AS minTemperatur,
MAX(TT_TU) AS maxTemperatur,
AVG(TT_TU) AS avgTemperatur
FROM base_gs
GROUP BY GROUPING SETS (
(bundesland_clean, jahr_val), -- 1. Jahr und Bundesland
(stationId, jahr_val), -- 2. Jahr und Station
(bundesland_clean, monat_val) -- 3. Monat und Bundesland
)
"""
spark.sql(q_prep).createOrReplaceTempView("gs_base")
df = spark.sql(query)
df.cache()
df.createOrReplaceTempView("tempmma_gs")
q_sets = """
def show_seperate_gs(spark: SparkSession, limit: int):
# Filter: stationId muss NULL sein, monat muss NULL sein
spark.sql("""
SELECT
year,
month,
jahr,
bundesland,
station_name,
MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t
FROM gs_base
GROUP BY GROUPING SETS (
(year, bundesland),
(year, station_name),
(month, bundesland)
)
"""
df_gs = spark.sql(q_sets)
df_gs.cache()
df_gs.createOrReplaceTempView("grouping_result")
# Action zum Cachen
df_gs.count()
print(f"Grouping Sets berechnet. Dauer: {time.time() - start:.2f}s")
minTemperatur,
maxTemperatur,
ROUND(avgTemperatur, 2) as avgTemperatur
FROM tempmma_gs
WHERE bundesland IS NOT NULL
AND jahr IS NOT NULL
AND stationId IS NULL
AND monat IS NULL
ORDER BY jahr DESC, bundesland ASC
""").show(limit, truncate=False)
print("Auswahl 1: Jahr & Bundesland")
spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5)
print("Auswahl 2: Jahr & Station")
spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5)
print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)")
spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5)
input(">> 11c: Check Spark UI (Zugriffspläne/Storage). Enter...")
# ---------------------------------------------------------
# AUFGABE 12
# ---------------------------------------------------------
def task_12_stocks_analysis(spark: SparkSession):
print("\n--- Aufgabe 12: Stocks & Portfolio ---")
# a) Erstes und letztes Datum je Symbol
print("a) Min/Max Datum pro Symbol")
t0 = time.time()
q_a = """
SELECT symbol, MIN(date) as first_date, MAX(date) as last_date
FROM stocks
GROUP BY symbol
ORDER BY symbol
"""
spark.sql(q_a).show(5)
print(f"Zeit a): {time.time()-t0:.2f}s")
# b) Aggregationen 2009
print("\nb) High/Low/Avg Close 2009")
t0 = time.time()
q_b = """
SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close
FROM stocks
WHERE YEAR(date) = 2009
GROUP BY symbol
ORDER BY symbol
"""
spark.sql(q_b).show(5)
print(f"Zeit b): {time.time()-t0:.2f}s")
# c) Lateral View (Explode Portfolio)
print("\nc) Lateral View: Aktien in Portfolios")
t0 = time.time()
q_c = """
# Filter: bundesland muss NULL sein, monat muss NULL sein
spark.sql("""
SELECT
h.symbol,
SUM(h.amount) as total_shares,
COUNT(p.portfolioId) as num_portfolios,
AVG(h.amount) as avg_per_portfolio
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
GROUP BY h.symbol
ORDER BY h.symbol
"""
spark.sql(q_c).show(5)
print(f"Zeit c): {time.time()-t0:.2f}s")
jahr,
stationId,
minTemperatur,
maxTemperatur,
ROUND(avgTemperatur, 2) as avgTemperatur
FROM tempmma_gs
WHERE stationId IS NOT NULL
AND jahr IS NOT NULL
AND bundesland IS NULL
ORDER BY jahr DESC, stationId ASC
""").show(limit, truncate=False)
# d) Symbole in keinem Portfolio (Anti Join)
print("\nd) Symbole ohne Portfolio")
t0 = time.time()
q_d = """
SELECT DISTINCT s.symbol
FROM stocks s
LEFT ANTI JOIN (
SELECT DISTINCT h.symbol
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
) p_sym ON s.symbol = p_sym.symbol
ORDER BY s.symbol
"""
spark.sql(q_d).show(5)
print(f"Zeit d): {time.time()-t0:.2f}s")
input(">> 12 a-d fertig. Check UI. Enter für e)...")
# e) Portfolio Wert Ende 2010
print("\ne) Portfolio Bewertung Ende 2010")
t0 = time.time()
q_last_price = """
SELECT symbol, close
FROM (
SELECT
symbol,
close,
ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn
FROM stocks
WHERE YEAR(date) = 2010
) tmp
WHERE rn = 1
"""
spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end")
# Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren
q_val = """
# Filter: stationId muss NULL sein, jahr muss NULL sein
spark.sql("""
SELECT
p.portfolioId,
SUM(h.amount * s.close) as portfolio_value_2010
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
JOIN stocks_2010_end s ON h.symbol = s.symbol
GROUP BY p.portfolioId
ORDER BY p.portfolioId
"""
spark.sql(q_val).show(5)
print(f"Zeit e): {time.time()-t0:.2f}s")
monat,
bundesland,
minTemperatur,
maxTemperatur,
ROUND(avgTemperatur, 2) as avgTemperatur
FROM tempmma_gs
WHERE bundesland IS NOT NULL
AND monat IS NOT NULL
AND jahr IS NULL
ORDER BY monat ASC, bundesland ASC
""").show(limit, truncate=False)
def main(scon, spark):
init_view_stations(spark)
read_parquet_tables(spark)
# Aufgabe 11
task_11a_rollup(spark, station_name="Kempten")
task_11b_rank(spark)
task_11c_groupingsets(spark)
# Kempten ID = 2559
create_mma_rollup(spark, 2559)
for level in ["years", "quartals", "months", "days"]:
plot_date_values(spark, level)
create_tempmonat(spark)
print("Rangfolgen 2015:")
rank_temperatures(spark, 18, 2015)
print("Rangfolgen Gesamt:")
rank_temperatures(spark, 18)
create_grouping_sets_view(spark)
show_seperate_gs(spark, 10)
# Aufgabe 12
init_view_stocks(spark)
task_12_stocks_analysis(spark)
if __name__ == '__main__':
if __name__ == "__main__":
main(scon, spark)