Compare commits

..

1 Commits

Author SHA1 Message Date
1b2de95b2e 12 2025-12-11 21:30:51 +01:00
2 changed files with 356 additions and 599 deletions

View File

@@ -1,366 +1,276 @@
from __future__ import annotations
from sparkstart import scon, spark from sparkstart import scon, spark
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import pandas as pd
HDFSPATH = "hdfs://193.174.205.250:54310/" HDFSPATH = "hdfs://193.174.205.250:54310/"
HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
def read_parquets(spark: SparkSession):
def read_parquet_tables(spark: SparkSession) -> None:
"""Load station master data and hourly measurements from parquet if needed."""
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
stations_df = spark.read.parquet(stations_path) stations_df = spark.read.parquet(stations_path)
stations_df.createOrReplaceTempView("german_stations") stations_df.createOrReplaceTempView("german_stations")
stations_df.cache()
products_df = spark.read.parquet(products_path) products_df = spark.read.parquet(products_path)
products_df.createOrReplaceTempView("german_stations_data") products_df.createOrReplaceTempView("german_stations_data")
stations_df.cache()
products_df.cache() products_df.cache()
def task_11a_rollup(spark: SparkSession, station_name="Kempten"):
print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---")
start_time = time.time()
def _escape_like(value: str) -> str: # 1. Station ID finden
"""Escape single quotes for safe SQL literal usage.""" # Case-insensitive search
return value.replace("'", "''") sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE lower(station_name) LIKE '%{station_name.lower()}%'")
try:
sid = sid_df.collect()[0]['stationId']
print(f"Station found: {station_name} -> ID {sid}")
except IndexError:
print(f"Station {station_name} nicht gefunden.")
return
# 2. Rollup Query vorbereiten
def resolve_station_id(spark: SparkSession, station_identifier) -> int: # FIX: Parse string date 'YYYYMMDD' to real DATE object first
"""Resolve station id either from int input or fuzzy name search.""" q_prep = f"""
if isinstance(station_identifier, int):
return station_identifier
if isinstance(station_identifier, str) and station_identifier.strip().isdigit():
return int(station_identifier.strip())
if isinstance(station_identifier, str):
needle = _escape_like(station_identifier.lower())
q = (
"SELECT stationId FROM german_stations "
f"WHERE lower(station_name) LIKE '%{needle}%' ORDER BY station_name LIMIT 1"
)
result = spark.sql(q).collect()
if not result:
raise ValueError(f"No station found for pattern '{station_identifier}'")
return int(result[0]["stationId"])
raise ValueError("station_identifier must be int or str")
def build_station_rollup_for_station(spark: SparkSession, station_identifier) -> None:
"""Create rollup view with min/max/avg per hour/day/month/quarter/year."""
station_id = resolve_station_id(spark, station_identifier)
q = f"""
WITH base AS (
SELECT SELECT
d.stationId, YEAR(TO_DATE(date, 'yyyyMMdd')) as yr,
gs.station_name, QUARTER(TO_DATE(date, 'yyyyMMdd')) as qt,
TO_TIMESTAMP(CONCAT(d.date, LPAD(CAST(d.hour AS STRING), 2, '0')), 'yyyyMMddHH') AS hour_ts, MONTH(TO_DATE(date, 'yyyyMMdd')) as mo,
TO_DATE(d.date, 'yyyyMMdd') AS day_date, DAY(TO_DATE(date, 'yyyyMMdd')) as da,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_in_year, TT_TU
QUARTER(TO_DATE(d.date, 'yyyyMMdd')) AS quarter_in_year, FROM german_stations_data
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value, WHERE stationId = {sid}
d.TT_TU AS temperature AND TT_TU IS NOT NULL
FROM german_stations_data d AND TT_TU > -50
JOIN german_stations gs ON d.stationId = gs.stationId AND TT_TU < 60
WHERE d.stationId = {station_id}
AND d.TT_TU IS NOT NULL
AND d.TT_TU <> -999
),
rollup_base AS (
SELECT
stationId,
station_name,
hour_ts,
day_date,
month_in_year,
quarter_in_year,
year_value,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
AVG(temperature) AS avg_temp
FROM base
GROUP BY stationId, station_name, ROLLUP(year_value, quarter_in_year, month_in_year, day_date, hour_ts)
)
SELECT
stationId,
station_name,
hour_ts,
day_date,
month_in_year,
quarter_in_year,
year_value,
CASE WHEN month_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(month_in_year AS STRING), 2, '0'), '-01')) END AS month_start_date,
CASE WHEN quarter_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(quarter_in_year * 3 - 2 AS STRING), 2, '0'), '-01')) END AS quarter_start_date,
CASE WHEN year_value IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-01-01')) END AS year_start_date,
min_temp,
max_temp,
avg_temp
FROM rollup_base
""" """
rollup_df = spark.sql(q) spark.sql(q_prep).createOrReplaceTempView("data_prep")
rollup_df.cache()
rollup_df.createOrReplaceTempView("station_rollup")
# 3. Rollup Execution
# Note: We use string construction for quarters/months to ensure we get a valid date string for plotting
q_rollup = """
SELECT
yr, qt, mo, da,
MIN(TT_TU) as min_temp,
MAX(TT_TU) as max_temp,
AVG(TT_TU) as avg_temp,
def _year_window(spark: SparkSession, years_back: int, station_id: int) -> tuple[int, int] | None: -- Construct dates for plotting (handling the NULLs from ROLLUP)
stats = spark.sql( -- For Quarter: Use 1st month of quarter
f"SELECT MIN(year_value) AS min_year, MAX(year_value) AS max_year FROM station_rollup WHERE year_value IS NOT NULL AND stationId = {station_id}" DATE(concat_ws('-', yr, cast(qt*3-2 as int), '01')) as qt_date,
).collect() -- For Month: Use 1st day of month
if not stats or stats[0]["max_year"] is None: MAKE_DATE(yr, mo, 1) as mo_date,
return None -- For Year: Use Jan 1st
min_year = int(stats[0]["min_year"]) MAKE_DATE(yr, 1, 1) as yr_date,
max_year = int(stats[0]["max_year"]) -- For Day: Use actual date
start_year = max(min_year, max_year - years_back + 1) MAKE_DATE(yr, mo, da) as da_date
return start_year, max_year
FROM data_prep
GROUP BY ROLLUP(yr, qt, mo, da)
"""
def plot_station_rollup_levels( df_rollup = spark.sql(q_rollup)
spark: SparkSession, df_rollup.cache()
station_identifier, df_rollup.createOrReplaceTempView("station_rollup")
day_span_years: int = 3,
agg_span_years: int = 15,
) -> None:
"""Plot day, month, quarter, and year aggregates for the given station."""
station_id = resolve_station_id(spark, station_identifier)
needs_refresh = not spark.catalog.tableExists("station_rollup")
if not needs_refresh:
count = spark.sql(
f"SELECT COUNT(*) AS cnt FROM station_rollup WHERE stationId = {station_id}"
).collect()[0]["cnt"]
needs_refresh = count == 0
if needs_refresh:
build_station_rollup_for_station(spark, station_id)
day_window = _year_window(spark, day_span_years, station_id) # Trigger Action
if day_window is None: count = df_rollup.count()
print("No data available for plotting") print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start_time:.2f}s")
return
month_window = _year_window(spark, agg_span_years, station_id)
if month_window is None:
print("No aggregated window available")
return
def _plot(query: str, figure_idx: int, title: str, x_col: str = "bucket_date") -> None: # --- PLOTTING ---
pdf = spark.sql(query).toPandas()
if pdf.empty: # Plot 1: Tageswerte (letzte 3 Jahre)
print(f"No data for {title}") # Filter: All levels must be present (not null)
return q_days = """
plt.figure(num=figure_idx) SELECT da_date as date, avg_temp
plt.clf() FROM station_rollup
metrics = [ WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL
("min_temp", "Min", "#1f77b4"), AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup WHERE yr IS NOT NULL)
("avg_temp", "Avg", "#ff7f0e"), ORDER BY date
("max_temp", "Max", "#2ca02c"), """
] pdf_days = spark.sql(q_days).toPandas()
for col, label, color in metrics:
if col in pdf: if pdf_days.empty:
plt.plot(pdf[x_col], pdf[col], label=label, color=color) print("Warnung: Keine Daten für Tages-Plot gefunden.")
plt.title(title) else:
plt.xlabel("Datum") plt.figure(1, figsize=(10, 5))
plt.ylabel("Temperatur (°C)") plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5)
plt.title(f"{station_name}: Daily Average (Last 3 Years)")
plt.xlabel('Date')
plt.ylabel('Temp °C')
plt.tight_layout()
plt.show()
# Plot 2: Monatswerte (10-20 Jahre)
# Filter: Day is NULL (aggregation level), but Month is NOT NULL
q_months = """
SELECT mo_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date
"""
pdf_months = spark.sql(q_months).toPandas()
if not pdf_months.empty:
plt.figure(2, figsize=(10, 5))
plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg')
plt.title(f"{station_name}: Monthly Average (Last 20 Years)")
plt.xlabel('Date')
plt.ylabel('Temp °C')
plt.tight_layout()
plt.show()
# Plot 3: Quartalswerte
# Filter: Month is NULL, Quarter is NOT NULL
q_quarters = """
SELECT qt_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date
"""
pdf_quarters = spark.sql(q_quarters).toPandas()
if not pdf_quarters.empty:
plt.figure(3, figsize=(10, 5))
plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg')
plt.title(f"{station_name}: Quarterly Average (Last 20 Years)")
plt.tight_layout()
plt.show()
# Plot 4: Jahreswerte
# Filter: Quarter is NULL, Year is NOT NULL
q_years = """
SELECT yr_date as date, min_temp, max_temp, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date
"""
pdf_years = spark.sql(q_years).toPandas()
if not pdf_years.empty:
plt.figure(4, figsize=(10, 5))
plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max')
plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg')
plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min')
plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)")
plt.legend() plt.legend()
plt.tight_layout() plt.tight_layout()
plt.show() plt.show()
day_start, day_end = day_window
q_day = f"""
SELECT day_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND hour_ts IS NULL
AND day_date IS NOT NULL
AND year_value BETWEEN {day_start} AND {day_end}
ORDER BY bucket_date
"""
_plot(q_day, 1, f"Tagesmittelwerte {day_start}-{day_end}")
agg_start, agg_end = month_window def task_11b_rank(spark: SparkSession):
q_month = f""" print("\n--- Aufgabe 11b: TempMonat Ranking ---")
SELECT month_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND day_date IS NULL
AND month_in_year IS NOT NULL
AND year_value BETWEEN {agg_start} AND {agg_end}
ORDER BY bucket_date
"""
_plot(q_month, 2, f"Monatsmittelwerte {agg_start}-{agg_end}")
q_quarter = f"""
SELECT quarter_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND month_in_year IS NULL
AND quarter_in_year IS NOT NULL
AND year_value BETWEEN {agg_start} AND {agg_end}
ORDER BY bucket_date
"""
_plot(q_quarter, 3, f"Quartalsmittelwerte {agg_start}-{agg_end}")
q_year = f"""
SELECT year_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND quarter_in_year IS NULL
AND year_value IS NOT NULL
ORDER BY bucket_date
"""
_plot(q_year, 4, "Jahresmittelwerte")
def create_tempmonat(spark: SparkSession) -> None: q_tempmonat = """
"""Create cached temp table tempmonat with monthly aggregates per station."""
q = """
SELECT SELECT
d.stationId, d.stationId,
gs.station_name, s.station_name,
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value, SUBSTR(CAST(d.date AS STRING), 1, 4) as year,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value, SUBSTR(CAST(d.date AS STRING), 6, 2) as month,
MIN(d.TT_TU) AS min_temp, MIN(d.TT_TU) as min_t,
MAX(d.TT_TU) AS max_temp, MAX(d.TT_TU) as max_t,
AVG(d.TT_TU) AS avg_temp AVG(d.TT_TU) as avg_t
FROM german_stations_data d FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId JOIN german_stations s ON d.stationId = s.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50
GROUP BY d.stationId, gs.station_name, YEAR(TO_DATE(d.date, 'yyyyMMdd')), MONTH(TO_DATE(d.date, 'yyyyMMdd')) GROUP BY d.stationId, s.station_name, year, month
""" """
monthly_df = spark.sql(q) df_tm = spark.sql(q_tempmonat)
monthly_df.cache() df_tm.createOrReplaceTempView("tempmonat")
monthly_df.createOrReplaceTempView("tempmonat")
# 1. Ranking Partitioniert nach Monat im Jahr 2015
def rank_coldest_per_month_2015(spark: SparkSession): print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...")
"""Rank stations by coldest values per month for 2015 using tempmonat.""" q_rank_2015 = """
return spark.sql(
"""
SELECT SELECT
stationId, month, station_name, min_t,
station_name, RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min,
year_value, RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max,
month_value, RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg
min_temp,
max_temp,
avg_temp,
RANK() OVER (PARTITION BY month_value ORDER BY min_temp ASC) AS rank_min,
RANK() OVER (PARTITION BY month_value ORDER BY max_temp ASC) AS rank_max,
RANK() OVER (PARTITION BY month_value ORDER BY avg_temp ASC) AS rank_avg
FROM tempmonat FROM tempmonat
WHERE year_value = 2015 WHERE year = '2015'
ORDER BY rank_min, month_value ORDER BY rank_min, month
""" """
) spark.sql(q_rank_2015).show(10)
# 2. Globales Ranking (über alle Monate/Jahre hinweg)
def rank_coldest_overall(spark: SparkSession): print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...")
"""Rank stations by coldest values over all months/years (no partition).""" q_rank_global = """
return spark.sql(
"""
SELECT SELECT
stationId, year, month, station_name, min_t,
station_name, RANK() OVER (ORDER BY min_t ASC) as rank_min,
year_value, RANK() OVER (ORDER BY max_t ASC) as rank_max,
month_value, RANK() OVER (ORDER BY avg_t ASC) as rank_avg
min_temp,
max_temp,
avg_temp,
RANK() OVER (ORDER BY min_temp ASC) AS rank_min,
RANK() OVER (ORDER BY max_temp ASC) AS rank_max,
RANK() OVER (ORDER BY avg_temp ASC) AS rank_avg
FROM tempmonat FROM tempmonat
ORDER BY rank_min ORDER BY rank_min
""" """
) spark.sql(q_rank_global).show(10)
print("11b: Fertig.")
def create_grouping_sets_overview(spark: SparkSession) -> None: def task_11c_groupingsets(spark: SparkSession):
"""Compute grouping sets for requested aggregations and cache the result.""" print("\n--- Aufgabe 11c: Grouping Sets ---")
q = """
WITH base AS (
q_prep = """
SELECT SELECT
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value, CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value, CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month,
gs.bundesland, s.station_name,
gs.stationId, s.bundesland,
gs.station_name, d.TT_TU
d.TT_TU AS temperature
FROM german_stations_data d FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId JOIN german_stations s ON d.stationId = s.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 WHERE d.TT_TU > -50
) """
spark.sql(q_prep).createOrReplaceTempView("gs_base")
q_sets = """
SELECT SELECT
year_value, year,
month_value, month,
bundesland, bundesland,
stationId,
station_name, station_name,
MIN(temperature) AS min_temp, MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t
MAX(temperature) AS max_temp, FROM gs_base
AVG(temperature) AS avg_temp
FROM base
GROUP BY GROUPING SETS ( GROUP BY GROUPING SETS (
(year_value, bundesland), (year, bundesland),
(year_value, stationId, station_name, bundesland), (year, station_name),
(month_value, bundesland) (month, bundesland)
) )
""" """
grouped_df = spark.sql(q) df_gs = spark.sql(q_sets)
grouped_df.cache() df_gs.cache()
grouped_df.createOrReplaceTempView("grouping_sets_stats") df_gs.createOrReplaceTempView("grouping_result")
# Action zum Cachen
df_gs.count()
print("Grouping Sets berechnet.")
def select_year_bundesland(spark: SparkSession): print("Auswahl 1: Jahr & Bundesland")
return spark.sql( spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5)
"""
SELECT year_value, bundesland, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE bundesland IS NOT NULL AND month_value IS NULL AND stationId IS NULL
ORDER BY year_value, bundesland
"""
)
print("Auswahl 2: Jahr & Station")
spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5)
def select_year_station(spark: SparkSession): print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)")
return spark.sql( spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5)
"""
SELECT year_value, stationId, station_name, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE stationId IS NOT NULL AND month_value IS NULL
ORDER BY year_value, stationId
"""
)
def select_month_bundesland(spark: SparkSession):
return spark.sql(
"""
SELECT month_value, bundesland, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE month_value IS NOT NULL AND year_value IS NULL
ORDER BY month_value, bundesland
"""
)
def main(scon, spark): def main(scon, spark):
read_parquet_tables(spark) read_parquets(spark)
build_station_rollup_for_station(spark, "kempten")
plot_station_rollup_levels(spark, "kempten")
create_tempmonat(spark) # Aufgabe 11
print("Rangfolgen 2015 je Monat:") task_11a_rollup(spark, station_name="Kempten")
rank_coldest_per_month_2015(spark).show(36, truncate=False) task_11b_rank(spark)
print("Rangfolgen gesamt:") task_11c_groupingsets(spark)
rank_coldest_overall(spark).show(36, truncate=False)
create_grouping_sets_overview(spark) if __name__ == '__main__':
print("Jahr vs Bundesland:")
select_year_bundesland(spark).show(20, truncate=False)
print("Jahr vs Station:")
select_year_station(spark).show(20, truncate=False)
print("Monat vs Bundesland:")
select_month_bundesland(spark).show(20, truncate=False)
if __name__ == "__main__":
main(scon, spark) main(scon, spark)

View File

@@ -1,276 +1,123 @@
from __future__ import annotations
from typing import Iterable, Sequence
from pyspark.sql import SparkSession, functions as F, types as T
from sparkstart import scon, spark from sparkstart import scon, spark
from pyspark.sql import SparkSession
import time
import matplotlib.pyplot as plt
import pandas as pd
HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/"
HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
HDFSPATH = "hdfs://193.174.205.250:54310/" def init_view_stocks(spark):
"""Lädt die Stocks-Daten für Aufgabe 12"""
# Hinweis: Pfade anpassen, falls sie im Cluster anders liegen
spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks")
spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio")
# ---------------------------------------------------------
# AUFGABE 12
# ---------------------------------------------------------
_DATE_FALLBACK_EXPR = "COALESCE(date_value, TO_DATE(date_str), TO_DATE(date_str, 'yyyyMMdd'))" def task_12_stocks_analysis(spark: SparkSession):
print("\n--- Aufgabe 12: Stocks & Portfolio ---")
# a) Erstes und letztes Datum je Symbol
def _resolve_column_name(columns: Sequence[str], candidates: Iterable[str]) -> str: print("a) Min/Max Datum pro Symbol")
t0 = time.time()
lowered = {col.lower(): col for col in columns} q_a = """
for candidate in candidates: SELECT symbol, MIN(date) as first_date, MAX(date) as last_date
match = lowered.get(candidate.lower()) FROM stocks
if match:
return match
raise ValueError(f"None of the candidate columns {list(candidates)} exist in {columns}")
def _normalize_stocks_view(spark: SparkSession) -> None:
stocks_path = HDFSPATH + "stocks/stocks.parquet"
stocks_df = spark.read.parquet(stocks_path)
symbol_col = _resolve_column_name(stocks_df.columns, ("symbol", "ticker"))
date_col = _resolve_column_name(stocks_df.columns, ("date", "pricedate", "dt"))
close_col = _resolve_column_name(stocks_df.columns, ("close", "closeprice", "closingprice"))
stocks_df = (
stocks_df
.select(
F.col(symbol_col).alias("symbol"),
F.col(date_col).alias("raw_date"),
F.col(close_col).alias("close_raw"),
)
.withColumn("date_str", F.col("raw_date").cast("string"))
)
date_candidates = [
F.col("raw_date").cast("date"),
F.to_date("raw_date"),
F.to_date("date_str"),
F.to_date("date_str", "yyyyMMdd"),
F.to_date("date_str", "MM/dd/yyyy"),
]
stocks_df = (
stocks_df
.withColumn("date_value", F.coalesce(*date_candidates))
.withColumn("year_value", F.substring("date_str", 1, 4).cast("int"))
.withColumn("close_value", F.col("close_raw").cast("double"))
.select("symbol", "date_value", "date_str", "year_value", "close_value")
)
stocks_df.cache()
stocks_df.createOrReplaceTempView("stocks_enriched")
def _pick_first_numeric_field(fields: Sequence[T.StructField]) -> str:
numeric_types = (
T.ByteType,
T.ShortType,
T.IntegerType,
T.LongType,
T.FloatType,
T.DoubleType,
T.DecimalType,
)
for field in fields:
if isinstance(field.dataType, numeric_types):
return field.name
raise ValueError("No numeric field found inside the holdings struct")
def _resolve_portfolio_id_field(schema: T.StructType) -> str:
priority = ("portfolio_id", "portfolioid", "id")
lowered = {field.name.lower(): field.name for field in schema.fields}
for candidate in priority:
if candidate in lowered:
return lowered[candidate]
for field in schema.fields:
if not isinstance(field.dataType, (T.ArrayType, T.MapType)):
return field.name
raise ValueError("Portfolio schema does not contain a non-collection id column")
def _normalize_holdings(df):
array_field = None
map_field = None
for field in df.schema.fields:
if isinstance(field.dataType, T.ArrayType) and isinstance(field.dataType.elementType, T.StructType):
array_field = field
break
if isinstance(field.dataType, T.MapType) and isinstance(field.dataType.keyType, T.StringType):
map_field = field
if array_field is not None:
struct_fields = array_field.dataType.elementType.fields
symbol_field = _resolve_column_name([f.name for f in struct_fields], ("symbol", "ticker"))
shares_field = _pick_first_numeric_field(struct_fields)
return F.expr(
f"transform(`{array_field.name}`, x -> named_struct('symbol', x.`{symbol_field}`, 'shares', CAST(x.`{shares_field}` AS DOUBLE)))"
)
if map_field is not None and isinstance(map_field.dataType.valueType, (T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType)):
return F.expr(
f"transform(map_entries(`{map_field.name}`), x -> named_struct('symbol', x.key, 'shares', CAST(x.value AS DOUBLE)))"
)
raise ValueError("Could not locate holdings column (array<struct> or map) in portfolio data")
def _normalize_portfolio_view(spark: SparkSession) -> None:
portfolio_path = HDFSPATH + "stocks/portfolio.parquet"
portfolio_df = spark.read.parquet(portfolio_path)
id_col = _resolve_portfolio_id_field(portfolio_df.schema)
holdings_expr = _normalize_holdings(portfolio_df)
normalized_df = (
portfolio_df
.select(
F.col(id_col).alias("portfolio_id"),
holdings_expr.alias("holdings"),
)
)
normalized_df.cache()
normalized_df.createOrReplaceTempView("portfolio")
spark.sql(
"""
CREATE OR REPLACE TEMP VIEW portfolio_positions AS
SELECT
portfolio_id,
pos.symbol AS symbol,
pos.shares AS shares
FROM portfolio
LATERAL VIEW explode(holdings) exploded AS pos
"""
)
def register_base_views(spark: SparkSession) -> None:
_normalize_stocks_view(spark)
_normalize_portfolio_view(spark)
def query_first_and_last_listing(spark: SparkSession):
q = f"""
SELECT
symbol,
MIN({_DATE_FALLBACK_EXPR}) AS first_listing,
MAX({_DATE_FALLBACK_EXPR}) AS last_listing
FROM stocks_enriched
WHERE symbol IS NOT NULL
GROUP BY symbol GROUP BY symbol
ORDER BY symbol ORDER BY symbol
""" """
return spark.sql(q) spark.sql(q_a).show(5)
print(f"Zeit a): {time.time()-t0:.2f}s")
# b) Aggregationen 2009
def query_close_stats_2009(spark: SparkSession): print("\nb) High/Low/Avg Close 2009")
t0 = time.time()
q = """ q_b = """
SELECT SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close
symbol, FROM stocks
MAX(close_value) AS max_close, WHERE YEAR(date) = 2009
MIN(close_value) AS min_close,
AVG(close_value) AS avg_close
FROM stocks_enriched
WHERE year_value = 2009 AND close_value IS NOT NULL AND symbol IS NOT NULL
GROUP BY symbol GROUP BY symbol
ORDER BY symbol ORDER BY symbol
""" """
return spark.sql(q) spark.sql(q_b).show(5)
print(f"Zeit b): {time.time()-t0:.2f}s")
# c) Lateral View (Explode Portfolio)
print("\nc) Lateral View: Aktien in Portfolios")
t0 = time.time()
def query_portfolio_symbol_stats(spark: SparkSession): q_c = """
q = """
SELECT SELECT
symbol, h.symbol,
SUM(shares) AS total_shares, SUM(h.amount) as total_shares,
COUNT(DISTINCT portfolio_id) AS portfolio_count, COUNT(p.portfolioId) as num_portfolios,
AVG(shares) AS avg_shares_per_portfolio AVG(h.amount) as avg_per_portfolio
FROM portfolio_positions FROM portfolio p
WHERE symbol IS NOT NULL LATERAL VIEW explode(holdings) t AS h
GROUP BY symbol GROUP BY h.symbol
ORDER BY symbol ORDER BY h.symbol
""" """
return spark.sql(q) spark.sql(q_c).show(5)
print(f"Zeit c): {time.time()-t0:.2f}s")
# d) Symbole in keinem Portfolio (Anti Join)
def query_symbols_missing_in_portfolios(spark: SparkSession): print("\nd) Symbole ohne Portfolio")
t0 = time.time()
q = """ q_d = """
SELECT DISTINCT s.symbol SELECT DISTINCT s.symbol
FROM stocks_enriched s FROM stocks s
LEFT ANTI JOIN (SELECT DISTINCT symbol FROM portfolio_positions WHERE symbol IS NOT NULL) p LEFT ANTI JOIN (
ON s.symbol = p.symbol SELECT DISTINCT h.symbol
WHERE s.symbol IS NOT NULL FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
) p_sym ON s.symbol = p_sym.symbol
ORDER BY s.symbol ORDER BY s.symbol
""" """
return spark.sql(q) spark.sql(q_d).show(5)
print(f"Zeit d): {time.time()-t0:.2f}s")
input(">> 12 a-d fertig. Check UI. Enter für e)...")
def query_portfolio_values_2010(spark: SparkSession): # e) Portfolio Wert Ende 2010
print("\ne) Portfolio Bewertung Ende 2010")
t0 = time.time()
q = f""" q_last_price = """
WITH quotes_2010 AS ( SELECT symbol, close
FROM (
SELECT SELECT
symbol, symbol,
close_value, close,
ROW_NUMBER() OVER ( ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn
PARTITION BY symbol FROM stocks
ORDER BY {_DATE_FALLBACK_EXPR} DESC, date_str DESC WHERE YEAR(date) = 2010
) AS rn ) tmp
FROM stocks_enriched
WHERE year_value = 2010 AND symbol IS NOT NULL AND close_value IS NOT NULL
),
last_quotes AS (
SELECT symbol, close_value
FROM quotes_2010
WHERE rn = 1 WHERE rn = 1
),
portfolio_values AS (
SELECT
pp.portfolio_id,
SUM(pp.shares * lq.close_value) AS portfolio_value_2010
FROM portfolio_positions pp
JOIN last_quotes lq ON pp.symbol = lq.symbol
GROUP BY pp.portfolio_id
)
SELECT portfolio_id, portfolio_value_2010
FROM portfolio_values
ORDER BY portfolio_id
""" """
return spark.sql(q) spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end")
# Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren
q_val = """
SELECT
p.portfolioId,
SUM(h.amount * s.close) as portfolio_value_2010
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
JOIN stocks_2010_end s ON h.symbol = s.symbol
GROUP BY p.portfolioId
ORDER BY p.portfolioId
"""
spark.sql(q_val).show(5)
print(f"Zeit e): {time.time()-t0:.2f}s")
def main(scon, spark): def main(scon, spark):
# Aufgabe 12
init_view_stocks(spark)
task_12_stocks_analysis(spark)
register_base_views(spark) if __name__ == '__main__':
print("(a) Erste und letzte Notierung je Symbol:")
query_first_and_last_listing(spark).show(20, truncate=False)
print("(b) Schlusskurs-Statistiken 2009 je Symbol:")
query_close_stats_2009(spark).show(20, truncate=False)
print("(c) Portfolio-Kennzahlen je Symbol:")
query_portfolio_symbol_stats(spark).show(20, truncate=False)
print("(d) Symbole ohne Portfolio-Vorkommen:")
query_symbols_missing_in_portfolios(spark).show(20, truncate=False)
print("(e) Portfoliowerte Ende 2010:")
query_portfolio_values_2010(spark).show(20, truncate=False)
if __name__ == "__main__":
main(scon, spark) main(scon, spark)