Compare commits

..

1 Commits

Author SHA1 Message Date
1b2de95b2e 12 2025-12-11 21:30:51 +01:00
2 changed files with 356 additions and 599 deletions

View File

@@ -1,366 +1,276 @@
from __future__ import annotations
from sparkstart import scon, spark from sparkstart import scon, spark
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import pandas as pd
HDFSPATH = "hdfs://193.174.205.250:54310/" HDFSPATH = "hdfs://193.174.205.250:54310/"
HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
def read_parquets(spark: SparkSession):
def read_parquet_tables(spark: SparkSession) -> None:
"""Load station master data and hourly measurements from parquet if needed."""
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
stations_df = spark.read.parquet(stations_path) stations_df = spark.read.parquet(stations_path)
stations_df.createOrReplaceTempView("german_stations") stations_df.createOrReplaceTempView("german_stations")
stations_df.cache()
products_df = spark.read.parquet(products_path) products_df = spark.read.parquet(products_path)
products_df.createOrReplaceTempView("german_stations_data") products_df.createOrReplaceTempView("german_stations_data")
stations_df.cache()
products_df.cache() products_df.cache()
def task_11a_rollup(spark: SparkSession, station_name="Kempten"):
print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---")
start_time = time.time()
def _escape_like(value: str) -> str: # 1. Station ID finden
"""Escape single quotes for safe SQL literal usage.""" # Case-insensitive search
return value.replace("'", "''") sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE lower(station_name) LIKE '%{station_name.lower()}%'")
try:
sid = sid_df.collect()[0]['stationId']
print(f"Station found: {station_name} -> ID {sid}")
except IndexError:
print(f"Station {station_name} nicht gefunden.")
return
# 2. Rollup Query vorbereiten
# FIX: Parse string date 'YYYYMMDD' to real DATE object first
q_prep = f"""
SELECT
YEAR(TO_DATE(date, 'yyyyMMdd')) as yr,
QUARTER(TO_DATE(date, 'yyyyMMdd')) as qt,
MONTH(TO_DATE(date, 'yyyyMMdd')) as mo,
DAY(TO_DATE(date, 'yyyyMMdd')) as da,
TT_TU
FROM german_stations_data
WHERE stationId = {sid}
AND TT_TU IS NOT NULL
AND TT_TU > -50
AND TT_TU < 60
"""
spark.sql(q_prep).createOrReplaceTempView("data_prep")
# 3. Rollup Execution
# Note: We use string construction for quarters/months to ensure we get a valid date string for plotting
q_rollup = """
SELECT
yr, qt, mo, da,
MIN(TT_TU) as min_temp,
MAX(TT_TU) as max_temp,
AVG(TT_TU) as avg_temp,
-- Construct dates for plotting (handling the NULLs from ROLLUP)
-- For Quarter: Use 1st month of quarter
DATE(concat_ws('-', yr, cast(qt*3-2 as int), '01')) as qt_date,
-- For Month: Use 1st day of month
MAKE_DATE(yr, mo, 1) as mo_date,
-- For Year: Use Jan 1st
MAKE_DATE(yr, 1, 1) as yr_date,
-- For Day: Use actual date
MAKE_DATE(yr, mo, da) as da_date
FROM data_prep
GROUP BY ROLLUP(yr, qt, mo, da)
"""
df_rollup = spark.sql(q_rollup)
df_rollup.cache()
df_rollup.createOrReplaceTempView("station_rollup")
# Trigger Action
count = df_rollup.count()
print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start_time:.2f}s")
# --- PLOTTING ---
# Plot 1: Tageswerte (letzte 3 Jahre)
# Filter: All levels must be present (not null)
q_days = """
SELECT da_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL
AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date
"""
pdf_days = spark.sql(q_days).toPandas()
if pdf_days.empty:
print("Warnung: Keine Daten für Tages-Plot gefunden.")
else:
plt.figure(1, figsize=(10, 5))
plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5)
plt.title(f"{station_name}: Daily Average (Last 3 Years)")
plt.xlabel('Date')
plt.ylabel('Temp °C')
plt.tight_layout()
plt.show()
# Plot 2: Monatswerte (10-20 Jahre)
# Filter: Day is NULL (aggregation level), but Month is NOT NULL
q_months = """
SELECT mo_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date
"""
pdf_months = spark.sql(q_months).toPandas()
if not pdf_months.empty:
plt.figure(2, figsize=(10, 5))
plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg')
plt.title(f"{station_name}: Monthly Average (Last 20 Years)")
plt.xlabel('Date')
plt.ylabel('Temp °C')
plt.tight_layout()
plt.show()
# Plot 3: Quartalswerte
# Filter: Month is NULL, Quarter is NOT NULL
q_quarters = """
SELECT qt_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date
"""
pdf_quarters = spark.sql(q_quarters).toPandas()
if not pdf_quarters.empty:
plt.figure(3, figsize=(10, 5))
plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg')
plt.title(f"{station_name}: Quarterly Average (Last 20 Years)")
plt.tight_layout()
plt.show()
# Plot 4: Jahreswerte
# Filter: Quarter is NULL, Year is NOT NULL
q_years = """
SELECT yr_date as date, min_temp, max_temp, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date
"""
pdf_years = spark.sql(q_years).toPandas()
if not pdf_years.empty:
plt.figure(4, figsize=(10, 5))
plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max')
plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg')
plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min')
plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)")
plt.legend()
plt.tight_layout()
plt.show()
def resolve_station_id(spark: SparkSession, station_identifier) -> int: def task_11b_rank(spark: SparkSession):
"""Resolve station id either from int input or fuzzy name search.""" print("\n--- Aufgabe 11b: TempMonat Ranking ---")
if isinstance(station_identifier, int):
return station_identifier
if isinstance(station_identifier, str) and station_identifier.strip().isdigit():
return int(station_identifier.strip())
if isinstance(station_identifier, str):
needle = _escape_like(station_identifier.lower())
q = (
"SELECT stationId FROM german_stations "
f"WHERE lower(station_name) LIKE '%{needle}%' ORDER BY station_name LIMIT 1"
)
result = spark.sql(q).collect()
if not result:
raise ValueError(f"No station found for pattern '{station_identifier}'")
return int(result[0]["stationId"])
raise ValueError("station_identifier must be int or str")
def build_station_rollup_for_station(spark: SparkSession, station_identifier) -> None: q_tempmonat = """
"""Create rollup view with min/max/avg per hour/day/month/quarter/year.""" SELECT
station_id = resolve_station_id(spark, station_identifier) d.stationId,
q = f""" s.station_name,
WITH base AS ( SUBSTR(CAST(d.date AS STRING), 1, 4) as year,
SELECT SUBSTR(CAST(d.date AS STRING), 6, 2) as month,
d.stationId, MIN(d.TT_TU) as min_t,
gs.station_name, MAX(d.TT_TU) as max_t,
TO_TIMESTAMP(CONCAT(d.date, LPAD(CAST(d.hour AS STRING), 2, '0')), 'yyyyMMddHH') AS hour_ts, AVG(d.TT_TU) as avg_t
TO_DATE(d.date, 'yyyyMMdd') AS day_date, FROM german_stations_data d
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_in_year, JOIN german_stations s ON d.stationId = s.stationId
QUARTER(TO_DATE(d.date, 'yyyyMMdd')) AS quarter_in_year, WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value, GROUP BY d.stationId, s.station_name, year, month
d.TT_TU AS temperature """
FROM german_stations_data d df_tm = spark.sql(q_tempmonat)
JOIN german_stations gs ON d.stationId = gs.stationId df_tm.createOrReplaceTempView("tempmonat")
WHERE d.stationId = {station_id}
AND d.TT_TU IS NOT NULL # 1. Ranking Partitioniert nach Monat im Jahr 2015
AND d.TT_TU <> -999 print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...")
), q_rank_2015 = """
rollup_base AS ( SELECT
SELECT month, station_name, min_t,
stationId, RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min,
station_name, RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max,
hour_ts, RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg
day_date, FROM tempmonat
month_in_year, WHERE year = '2015'
quarter_in_year, ORDER BY rank_min, month
year_value, """
MIN(temperature) AS min_temp, spark.sql(q_rank_2015).show(10)
MAX(temperature) AS max_temp,
AVG(temperature) AS avg_temp # 2. Globales Ranking (über alle Monate/Jahre hinweg)
FROM base print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...")
GROUP BY stationId, station_name, ROLLUP(year_value, quarter_in_year, month_in_year, day_date, hour_ts) q_rank_global = """
) SELECT
SELECT year, month, station_name, min_t,
stationId, RANK() OVER (ORDER BY min_t ASC) as rank_min,
station_name, RANK() OVER (ORDER BY max_t ASC) as rank_max,
hour_ts, RANK() OVER (ORDER BY avg_t ASC) as rank_avg
day_date, FROM tempmonat
month_in_year, ORDER BY rank_min
quarter_in_year, """
year_value, spark.sql(q_rank_global).show(10)
CASE WHEN month_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(month_in_year AS STRING), 2, '0'), '-01')) END AS month_start_date,
CASE WHEN quarter_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(quarter_in_year * 3 - 2 AS STRING), 2, '0'), '-01')) END AS quarter_start_date, print("11b: Fertig.")
CASE WHEN year_value IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-01-01')) END AS year_start_date,
min_temp,
max_temp,
avg_temp
FROM rollup_base
"""
rollup_df = spark.sql(q)
rollup_df.cache()
rollup_df.createOrReplaceTempView("station_rollup")
def _year_window(spark: SparkSession, years_back: int, station_id: int) -> tuple[int, int] | None: def task_11c_groupingsets(spark: SparkSession):
stats = spark.sql( print("\n--- Aufgabe 11c: Grouping Sets ---")
f"SELECT MIN(year_value) AS min_year, MAX(year_value) AS max_year FROM station_rollup WHERE year_value IS NOT NULL AND stationId = {station_id}"
).collect()
if not stats or stats[0]["max_year"] is None:
return None
min_year = int(stats[0]["min_year"])
max_year = int(stats[0]["max_year"])
start_year = max(min_year, max_year - years_back + 1)
return start_year, max_year
def plot_station_rollup_levels( q_prep = """
spark: SparkSession, SELECT
station_identifier, CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year,
day_span_years: int = 3, CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month,
agg_span_years: int = 15, s.station_name,
) -> None: s.bundesland,
"""Plot day, month, quarter, and year aggregates for the given station.""" d.TT_TU
station_id = resolve_station_id(spark, station_identifier) FROM german_stations_data d
needs_refresh = not spark.catalog.tableExists("station_rollup") JOIN german_stations s ON d.stationId = s.stationId
if not needs_refresh: WHERE d.TT_TU > -50
count = spark.sql( """
f"SELECT COUNT(*) AS cnt FROM station_rollup WHERE stationId = {station_id}" spark.sql(q_prep).createOrReplaceTempView("gs_base")
).collect()[0]["cnt"]
needs_refresh = count == 0
if needs_refresh:
build_station_rollup_for_station(spark, station_id)
day_window = _year_window(spark, day_span_years, station_id) q_sets = """
if day_window is None: SELECT
print("No data available for plotting") year,
return month,
month_window = _year_window(spark, agg_span_years, station_id) bundesland,
if month_window is None: station_name,
print("No aggregated window available") MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t
return FROM gs_base
GROUP BY GROUPING SETS (
(year, bundesland),
(year, station_name),
(month, bundesland)
)
"""
df_gs = spark.sql(q_sets)
df_gs.cache()
df_gs.createOrReplaceTempView("grouping_result")
def _plot(query: str, figure_idx: int, title: str, x_col: str = "bucket_date") -> None: # Action zum Cachen
pdf = spark.sql(query).toPandas() df_gs.count()
if pdf.empty: print("Grouping Sets berechnet.")
print(f"No data for {title}")
return
plt.figure(num=figure_idx)
plt.clf()
metrics = [
("min_temp", "Min", "#1f77b4"),
("avg_temp", "Avg", "#ff7f0e"),
("max_temp", "Max", "#2ca02c"),
]
for col, label, color in metrics:
if col in pdf:
plt.plot(pdf[x_col], pdf[col], label=label, color=color)
plt.title(title)
plt.xlabel("Datum")
plt.ylabel("Temperatur (°C)")
plt.legend()
plt.tight_layout()
plt.show()
day_start, day_end = day_window print("Auswahl 1: Jahr & Bundesland")
q_day = f""" spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5)
SELECT day_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND hour_ts IS NULL
AND day_date IS NOT NULL
AND year_value BETWEEN {day_start} AND {day_end}
ORDER BY bucket_date
"""
_plot(q_day, 1, f"Tagesmittelwerte {day_start}-{day_end}")
agg_start, agg_end = month_window print("Auswahl 2: Jahr & Station")
q_month = f""" spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5)
SELECT month_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND day_date IS NULL
AND month_in_year IS NOT NULL
AND year_value BETWEEN {agg_start} AND {agg_end}
ORDER BY bucket_date
"""
_plot(q_month, 2, f"Monatsmittelwerte {agg_start}-{agg_end}")
q_quarter = f"""
SELECT quarter_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND month_in_year IS NULL
AND quarter_in_year IS NOT NULL
AND year_value BETWEEN {agg_start} AND {agg_end}
ORDER BY bucket_date
"""
_plot(q_quarter, 3, f"Quartalsmittelwerte {agg_start}-{agg_end}")
q_year = f"""
SELECT year_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND quarter_in_year IS NULL
AND year_value IS NOT NULL
ORDER BY bucket_date
"""
_plot(q_year, 4, "Jahresmittelwerte")
def create_tempmonat(spark: SparkSession) -> None:
"""Create cached temp table tempmonat with monthly aggregates per station."""
q = """
SELECT
d.stationId,
gs.station_name,
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value,
MIN(d.TT_TU) AS min_temp,
MAX(d.TT_TU) AS max_temp,
AVG(d.TT_TU) AS avg_temp
FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
GROUP BY d.stationId, gs.station_name, YEAR(TO_DATE(d.date, 'yyyyMMdd')), MONTH(TO_DATE(d.date, 'yyyyMMdd'))
"""
monthly_df = spark.sql(q)
monthly_df.cache()
monthly_df.createOrReplaceTempView("tempmonat")
def rank_coldest_per_month_2015(spark: SparkSession):
"""Rank stations by coldest values per month for 2015 using tempmonat."""
return spark.sql(
"""
SELECT
stationId,
station_name,
year_value,
month_value,
min_temp,
max_temp,
avg_temp,
RANK() OVER (PARTITION BY month_value ORDER BY min_temp ASC) AS rank_min,
RANK() OVER (PARTITION BY month_value ORDER BY max_temp ASC) AS rank_max,
RANK() OVER (PARTITION BY month_value ORDER BY avg_temp ASC) AS rank_avg
FROM tempmonat
WHERE year_value = 2015
ORDER BY rank_min, month_value
"""
)
def rank_coldest_overall(spark: SparkSession):
"""Rank stations by coldest values over all months/years (no partition)."""
return spark.sql(
"""
SELECT
stationId,
station_name,
year_value,
month_value,
min_temp,
max_temp,
avg_temp,
RANK() OVER (ORDER BY min_temp ASC) AS rank_min,
RANK() OVER (ORDER BY max_temp ASC) AS rank_max,
RANK() OVER (ORDER BY avg_temp ASC) AS rank_avg
FROM tempmonat
ORDER BY rank_min
"""
)
def create_grouping_sets_overview(spark: SparkSession) -> None:
"""Compute grouping sets for requested aggregations and cache the result."""
q = """
WITH base AS (
SELECT
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value,
gs.bundesland,
gs.stationId,
gs.station_name,
d.TT_TU AS temperature
FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
)
SELECT
year_value,
month_value,
bundesland,
stationId,
station_name,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
AVG(temperature) AS avg_temp
FROM base
GROUP BY GROUPING SETS (
(year_value, bundesland),
(year_value, stationId, station_name, bundesland),
(month_value, bundesland)
)
"""
grouped_df = spark.sql(q)
grouped_df.cache()
grouped_df.createOrReplaceTempView("grouping_sets_stats")
def select_year_bundesland(spark: SparkSession):
return spark.sql(
"""
SELECT year_value, bundesland, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE bundesland IS NOT NULL AND month_value IS NULL AND stationId IS NULL
ORDER BY year_value, bundesland
"""
)
def select_year_station(spark: SparkSession):
return spark.sql(
"""
SELECT year_value, stationId, station_name, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE stationId IS NOT NULL AND month_value IS NULL
ORDER BY year_value, stationId
"""
)
def select_month_bundesland(spark: SparkSession):
return spark.sql(
"""
SELECT month_value, bundesland, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE month_value IS NOT NULL AND year_value IS NULL
ORDER BY month_value, bundesland
"""
)
print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)")
spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5)
def main(scon, spark): def main(scon, spark):
read_parquet_tables(spark) read_parquets(spark)
build_station_rollup_for_station(spark, "kempten")
plot_station_rollup_levels(spark, "kempten")
create_tempmonat(spark) # Aufgabe 11
print("Rangfolgen 2015 je Monat:") task_11a_rollup(spark, station_name="Kempten")
rank_coldest_per_month_2015(spark).show(36, truncate=False) task_11b_rank(spark)
print("Rangfolgen gesamt:") task_11c_groupingsets(spark)
rank_coldest_overall(spark).show(36, truncate=False)
create_grouping_sets_overview(spark) if __name__ == '__main__':
print("Jahr vs Bundesland:") main(scon, spark)
select_year_bundesland(spark).show(20, truncate=False)
print("Jahr vs Station:")
select_year_station(spark).show(20, truncate=False)
print("Monat vs Bundesland:")
select_month_bundesland(spark).show(20, truncate=False)
if __name__ == "__main__":
main(scon, spark)

View File

@@ -1,276 +1,123 @@
from __future__ import annotations
from typing import Iterable, Sequence
from pyspark.sql import SparkSession, functions as F, types as T
from sparkstart import scon, spark from sparkstart import scon, spark
from pyspark.sql import SparkSession
import time
import matplotlib.pyplot as plt
import pandas as pd
HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/"
HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
HDFSPATH = "hdfs://193.174.205.250:54310/" def init_view_stocks(spark):
"""Lädt die Stocks-Daten für Aufgabe 12"""
# Hinweis: Pfade anpassen, falls sie im Cluster anders liegen
spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks")
spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio")
# ---------------------------------------------------------
# AUFGABE 12
# ---------------------------------------------------------
_DATE_FALLBACK_EXPR = "COALESCE(date_value, TO_DATE(date_str), TO_DATE(date_str, 'yyyyMMdd'))" def task_12_stocks_analysis(spark: SparkSession):
print("\n--- Aufgabe 12: Stocks & Portfolio ---")
# a) Erstes und letztes Datum je Symbol
print("a) Min/Max Datum pro Symbol")
t0 = time.time()
q_a = """
SELECT symbol, MIN(date) as first_date, MAX(date) as last_date
FROM stocks
GROUP BY symbol
ORDER BY symbol
"""
spark.sql(q_a).show(5)
print(f"Zeit a): {time.time()-t0:.2f}s")
def _resolve_column_name(columns: Sequence[str], candidates: Iterable[str]) -> str: # b) Aggregationen 2009
print("\nb) High/Low/Avg Close 2009")
t0 = time.time()
q_b = """
SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close
FROM stocks
WHERE YEAR(date) = 2009
GROUP BY symbol
ORDER BY symbol
"""
spark.sql(q_b).show(5)
print(f"Zeit b): {time.time()-t0:.2f}s")
lowered = {col.lower(): col for col in columns} # c) Lateral View (Explode Portfolio)
for candidate in candidates: print("\nc) Lateral View: Aktien in Portfolios")
match = lowered.get(candidate.lower()) t0 = time.time()
if match:
return match
raise ValueError(f"None of the candidate columns {list(candidates)} exist in {columns}")
q_c = """
SELECT
h.symbol,
SUM(h.amount) as total_shares,
COUNT(p.portfolioId) as num_portfolios,
AVG(h.amount) as avg_per_portfolio
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
GROUP BY h.symbol
ORDER BY h.symbol
"""
spark.sql(q_c).show(5)
print(f"Zeit c): {time.time()-t0:.2f}s")
def _normalize_stocks_view(spark: SparkSession) -> None: # d) Symbole in keinem Portfolio (Anti Join)
print("\nd) Symbole ohne Portfolio")
t0 = time.time()
q_d = """
SELECT DISTINCT s.symbol
FROM stocks s
LEFT ANTI JOIN (
SELECT DISTINCT h.symbol
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
) p_sym ON s.symbol = p_sym.symbol
ORDER BY s.symbol
"""
spark.sql(q_d).show(5)
print(f"Zeit d): {time.time()-t0:.2f}s")
stocks_path = HDFSPATH + "stocks/stocks.parquet" input(">> 12 a-d fertig. Check UI. Enter für e)...")
stocks_df = spark.read.parquet(stocks_path)
symbol_col = _resolve_column_name(stocks_df.columns, ("symbol", "ticker")) # e) Portfolio Wert Ende 2010
date_col = _resolve_column_name(stocks_df.columns, ("date", "pricedate", "dt")) print("\ne) Portfolio Bewertung Ende 2010")
close_col = _resolve_column_name(stocks_df.columns, ("close", "closeprice", "closingprice")) t0 = time.time()
stocks_df = ( q_last_price = """
stocks_df SELECT symbol, close
.select( FROM (
F.col(symbol_col).alias("symbol"), SELECT
F.col(date_col).alias("raw_date"), symbol,
F.col(close_col).alias("close_raw"), close,
) ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn
.withColumn("date_str", F.col("raw_date").cast("string")) FROM stocks
) WHERE YEAR(date) = 2010
) tmp
WHERE rn = 1
"""
spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end")
date_candidates = [ # Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren
F.col("raw_date").cast("date"), q_val = """
F.to_date("raw_date"), SELECT
F.to_date("date_str"), p.portfolioId,
F.to_date("date_str", "yyyyMMdd"), SUM(h.amount * s.close) as portfolio_value_2010
F.to_date("date_str", "MM/dd/yyyy"), FROM portfolio p
] LATERAL VIEW explode(holdings) t AS h
JOIN stocks_2010_end s ON h.symbol = s.symbol
stocks_df = ( GROUP BY p.portfolioId
stocks_df ORDER BY p.portfolioId
.withColumn("date_value", F.coalesce(*date_candidates)) """
.withColumn("year_value", F.substring("date_str", 1, 4).cast("int")) spark.sql(q_val).show(5)
.withColumn("close_value", F.col("close_raw").cast("double")) print(f"Zeit e): {time.time()-t0:.2f}s")
.select("symbol", "date_value", "date_str", "year_value", "close_value")
)
stocks_df.cache()
stocks_df.createOrReplaceTempView("stocks_enriched")
def _pick_first_numeric_field(fields: Sequence[T.StructField]) -> str:
numeric_types = (
T.ByteType,
T.ShortType,
T.IntegerType,
T.LongType,
T.FloatType,
T.DoubleType,
T.DecimalType,
)
for field in fields:
if isinstance(field.dataType, numeric_types):
return field.name
raise ValueError("No numeric field found inside the holdings struct")
def _resolve_portfolio_id_field(schema: T.StructType) -> str:
priority = ("portfolio_id", "portfolioid", "id")
lowered = {field.name.lower(): field.name for field in schema.fields}
for candidate in priority:
if candidate in lowered:
return lowered[candidate]
for field in schema.fields:
if not isinstance(field.dataType, (T.ArrayType, T.MapType)):
return field.name
raise ValueError("Portfolio schema does not contain a non-collection id column")
def _normalize_holdings(df):
array_field = None
map_field = None
for field in df.schema.fields:
if isinstance(field.dataType, T.ArrayType) and isinstance(field.dataType.elementType, T.StructType):
array_field = field
break
if isinstance(field.dataType, T.MapType) and isinstance(field.dataType.keyType, T.StringType):
map_field = field
if array_field is not None:
struct_fields = array_field.dataType.elementType.fields
symbol_field = _resolve_column_name([f.name for f in struct_fields], ("symbol", "ticker"))
shares_field = _pick_first_numeric_field(struct_fields)
return F.expr(
f"transform(`{array_field.name}`, x -> named_struct('symbol', x.`{symbol_field}`, 'shares', CAST(x.`{shares_field}` AS DOUBLE)))"
)
if map_field is not None and isinstance(map_field.dataType.valueType, (T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType)):
return F.expr(
f"transform(map_entries(`{map_field.name}`), x -> named_struct('symbol', x.key, 'shares', CAST(x.value AS DOUBLE)))"
)
raise ValueError("Could not locate holdings column (array<struct> or map) in portfolio data")
def _normalize_portfolio_view(spark: SparkSession) -> None:
portfolio_path = HDFSPATH + "stocks/portfolio.parquet"
portfolio_df = spark.read.parquet(portfolio_path)
id_col = _resolve_portfolio_id_field(portfolio_df.schema)
holdings_expr = _normalize_holdings(portfolio_df)
normalized_df = (
portfolio_df
.select(
F.col(id_col).alias("portfolio_id"),
holdings_expr.alias("holdings"),
)
)
normalized_df.cache()
normalized_df.createOrReplaceTempView("portfolio")
spark.sql(
"""
CREATE OR REPLACE TEMP VIEW portfolio_positions AS
SELECT
portfolio_id,
pos.symbol AS symbol,
pos.shares AS shares
FROM portfolio
LATERAL VIEW explode(holdings) exploded AS pos
"""
)
def register_base_views(spark: SparkSession) -> None:
_normalize_stocks_view(spark)
_normalize_portfolio_view(spark)
def query_first_and_last_listing(spark: SparkSession):
q = f"""
SELECT
symbol,
MIN({_DATE_FALLBACK_EXPR}) AS first_listing,
MAX({_DATE_FALLBACK_EXPR}) AS last_listing
FROM stocks_enriched
WHERE symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_close_stats_2009(spark: SparkSession):
q = """
SELECT
symbol,
MAX(close_value) AS max_close,
MIN(close_value) AS min_close,
AVG(close_value) AS avg_close
FROM stocks_enriched
WHERE year_value = 2009 AND close_value IS NOT NULL AND symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_portfolio_symbol_stats(spark: SparkSession):
q = """
SELECT
symbol,
SUM(shares) AS total_shares,
COUNT(DISTINCT portfolio_id) AS portfolio_count,
AVG(shares) AS avg_shares_per_portfolio
FROM portfolio_positions
WHERE symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_symbols_missing_in_portfolios(spark: SparkSession):
q = """
SELECT DISTINCT s.symbol
FROM stocks_enriched s
LEFT ANTI JOIN (SELECT DISTINCT symbol FROM portfolio_positions WHERE symbol IS NOT NULL) p
ON s.symbol = p.symbol
WHERE s.symbol IS NOT NULL
ORDER BY s.symbol
"""
return spark.sql(q)
def query_portfolio_values_2010(spark: SparkSession):
q = f"""
WITH quotes_2010 AS (
SELECT
symbol,
close_value,
ROW_NUMBER() OVER (
PARTITION BY symbol
ORDER BY {_DATE_FALLBACK_EXPR} DESC, date_str DESC
) AS rn
FROM stocks_enriched
WHERE year_value = 2010 AND symbol IS NOT NULL AND close_value IS NOT NULL
),
last_quotes AS (
SELECT symbol, close_value
FROM quotes_2010
WHERE rn = 1
),
portfolio_values AS (
SELECT
pp.portfolio_id,
SUM(pp.shares * lq.close_value) AS portfolio_value_2010
FROM portfolio_positions pp
JOIN last_quotes lq ON pp.symbol = lq.symbol
GROUP BY pp.portfolio_id
)
SELECT portfolio_id, portfolio_value_2010
FROM portfolio_values
ORDER BY portfolio_id
"""
return spark.sql(q)
def main(scon, spark): def main(scon, spark):
# Aufgabe 12
init_view_stocks(spark)
task_12_stocks_analysis(spark)
register_base_views(spark) if __name__ == '__main__':
main(scon, spark)
print("(a) Erste und letzte Notierung je Symbol:")
query_first_and_last_listing(spark).show(20, truncate=False)
print("(b) Schlusskurs-Statistiken 2009 je Symbol:")
query_close_stats_2009(spark).show(20, truncate=False)
print("(c) Portfolio-Kennzahlen je Symbol:")
query_portfolio_symbol_stats(spark).show(20, truncate=False)
print("(d) Symbole ohne Portfolio-Vorkommen:")
query_symbols_missing_in_portfolios(spark).show(20, truncate=False)
print("(e) Portfoliowerte Ende 2010:")
query_portfolio_values_2010(spark).show(20, truncate=False)
if __name__ == "__main__":
main(scon, spark)