This commit is contained in:
2025-12-11 20:55:44 +01:00
parent d18e9823e5
commit ea4daa022c
4 changed files with 452 additions and 334 deletions

View File

@@ -5,7 +5,6 @@ import matplotlib.pyplot as plt
HDFSPATH = "hdfs://193.174.205.250:54310/" HDFSPATH = "hdfs://193.174.205.250:54310/"
def read_parquets(spark: SparkSession): def read_parquets(spark: SparkSession):
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"

View File

@@ -1,366 +1,187 @@
from __future__ import annotations
from sparkstart import scon, spark from sparkstart import scon, spark
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import time
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import pandas as pd
HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/" HDFSPATH = "hdfs://193.174.205.250:54310/"
HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
def init_view_stations(spark): def read_parquet_tables(spark: SparkSession) -> None:
"""Lädt die Stationsdaten für Aufgabe 11""" # Use your specific paths from Aufgabe 9
s_path = HDFSPATH_STATIONS + "german_stations.parquet" stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
d_path = HDFSPATH_STATIONS + "german_stations_data.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
spark.read.parquet(s_path).createOrReplaceTempView("german_stations")
spark.read.parquet(d_path).createOrReplaceTempView("german_stations_data")
def init_view_stocks(spark): spark.read.parquet(stations_path).createOrReplaceTempView("german_stations")
"""Lädt die Stocks-Daten für Aufgabe 12""" spark.read.parquet(products_path).createOrReplaceTempView("german_stations_data")
# Hinweis: Pfade anpassen, falls sie im Cluster anders liegen
spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks")
spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio")
# --------------------------------------------------------- # --- Aufgabe A: Rollup and Plotting ---
# AUFGABE 11
# ---------------------------------------------------------
def task_11a_rollup(spark: SparkSession, station_name="Kempten"): def create_mma_rollup(spark: SparkSession, station_id: int):
print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---")
start = time.time()
# 1. Station ID finden
sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE station_name LIKE '%{station_name}%'")
try:
sid = sid_df.collect()[0]['stationId']
except IndexError:
print(f"Station {station_name} nicht gefunden.")
return
# 2. Rollup Query vorbereiten
q_prep = f"""
SELECT
YEAR(date) as yr,
QUARTER(date) as qt,
MONTH(date) as mo,
DAY(date) as da,
TT_TU
FROM german_stations_data
WHERE stationId = {sid} AND TT_TU IS NOT NULL AND TT_TU > -50
""" """
spark.sql(q_prep).createOrReplaceTempView("data_prep") Functionally identical to Musterlösung 'createDataFrame'.
Uses your schema: TT_TU (temp), hour (messtunde), date (string yyyyMMdd).
q_rollup = """
SELECT
yr, qt, mo, da,
MIN(TT_TU) as min_temp,
MAX(TT_TU) as max_temp,
AVG(TT_TU) as avg_temp,
-- Datums-Konstruktion für Plots
DATE(STRING(yr) || '-' || STRING(qt*3-2) || '-01') as qt_date,
MAKE_DATE(yr, mo, 1) as mo_date,
MAKE_DATE(yr, 1, 1) as yr_date,
MAKE_DATE(yr, mo, da) as da_date
FROM data_prep
GROUP BY ROLLUP(yr, qt, mo, da)
""" """
query = f"""
df_rollup = spark.sql(q_rollup) WITH processed_data AS (
df_rollup.cache() SELECT
df_rollup.createOrReplaceTempView("station_rollup") TT_TU,
hour AS messtunde,
# Trigger Action for Cache & Time Measurement TO_DATE(SUBSTR(date, 1, 4), 'yyyy') AS jahr,
count = df_rollup.count() TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', LPAD(CAST(QUARTER(TO_DATE(date, 'yyyyMMdd'))*3-2 AS STRING), 2, '0'), '-01')) AS quartal,
print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start:.2f}s") TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', SUBSTR(date, 5, 2), '-01')) AS monat,
input(">> 11a: Check Spark UI (Stages/Storage) jetzt. Enter für Plots...") TO_DATE(date, 'yyyyMMdd') AS tag
FROM german_stations_data
# Plot 1: Tageswerte (letzte 3 Jahre der Daten) WHERE stationId = {station_id}
q_days = """ AND TT_TU IS NOT NULL
SELECT da_date as date, avg_temp AND TT_TU <> -999
FROM station_rollup )
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL SELECT
AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup) MIN(TT_TU) AS minTemperatur,
ORDER BY date MAX(TT_TU) AS maxTemperatur,
AVG(TT_TU) AS avgTemperatur,
jahr,
quartal,
monat,
tag,
messtunde
FROM processed_data
GROUP BY ROLLUP (jahr, quartal, monat, tag, messtunde)
ORDER BY jahr, quartal, monat, tag, messtunde
""" """
pdf_days = spark.sql(q_days).toPandas() df = spark.sql(query)
df.createOrReplaceTempView("mmacdcdata")
df.cache()
df.show(10)
def plot_date_values(spark: SparkSession, level: str):
"""Functionally identical plotting logic to Musterlösung 'plotDateValues'."""
filters = {
"days": "YEAR(jahr) > 2017 AND YEAR(jahr) < 2021 AND messtunde IS NULL AND tag IS NOT NULL",
"months": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NOT NULL",
"quartals": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NOT NULL",
"years": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NULL AND jahr IS NOT NULL"
}
x_col = {"days": "tag", "months": "monat", "quartals": "quartal", "years": "jahr"}
plt.figure(1, figsize=(10, 5)) pdf = spark.sql(f"SELECT * FROM mmacdcdata WHERE {filters[level]}").toPandas()
plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5) if pdf.empty: return
plt.title(f"{station_name}: Daily Average (Last 3 Years)")
plt.xlabel('Date')
plt.ylabel('Temp °C')
plt.tight_layout()
plt.show()
# Plot 2: Monatswerte (10-20 Jahre) plt.figure(figsize=(10, 5))
q_months = """ plt.plot(pdf[x_col[level]], pdf["maxTemperatur"], "red", label="Max")
SELECT mo_date as date, avg_temp plt.plot(pdf[x_col[level]], pdf["avgTemperatur"], "green", label="Avg")
FROM station_rollup plt.plot(pdf[x_col[level]], pdf["minTemperatur"], "blue", label="Min")
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL plt.title(f"{level.capitalize()}werte")
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup)
ORDER BY date
"""
pdf_months = spark.sql(q_months).toPandas()
plt.figure(2, figsize=(10, 5))
plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg')
plt.title(f"{station_name}: Monthly Average (Last 20 Years)")
plt.xlabel('Date')
plt.ylabel('Temp °C')
plt.tight_layout()
plt.show()
# Plot 3: Quartalswerte
q_quarters = """
SELECT qt_date as date, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup)
ORDER BY date
"""
pdf_quarters = spark.sql(q_quarters).toPandas()
plt.figure(3, figsize=(10, 5))
plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg')
plt.title(f"{station_name}: Quarterly Average (Last 20 Years)")
plt.show()
# Plot 4: Jahreswerte
q_years = """
SELECT yr_date as date, min_temp, max_temp, avg_temp
FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup)
ORDER BY date
"""
pdf_years = spark.sql(q_years).toPandas()
plt.figure(4, figsize=(10, 5))
plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max')
plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg')
plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min')
plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)")
plt.legend() plt.legend()
plt.grid(True)
plt.show() plt.show()
# --- Aufgabe B: Tempmonat and Ranking ---
def task_11b_rank(spark: SparkSession): def create_tempmonat(spark: SparkSession):
print("\n--- Aufgabe 11b: TempMonat Ranking ---") """Joins stations and data to create monthly aggregates using a CTE."""
start = time.time() query = """
WITH base_data AS (
q_tempmonat = """
SELECT SELECT
d.stationId, d.stationId,
s.station_name, gs.station_name AS stationsname,
SUBSTR(CAST(d.date AS STRING), 1, 4) as year, d.TT_TU,
SUBSTR(CAST(d.date AS STRING), 6, 2) as month, TO_DATE(SUBSTR(d.date, 1, 4), 'yyyy') AS jahr_val,
MIN(d.TT_TU) as min_t, TO_DATE(CONCAT(SUBSTR(d.date, 1, 4), '-', SUBSTR(d.date, 5, 2), '-01')) AS monat_val
MAX(d.TT_TU) as max_t,
AVG(d.TT_TU) as avg_t
FROM german_stations_data d FROM german_stations_data d
JOIN german_stations s ON d.stationId = s.stationId JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50 WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
GROUP BY d.stationId, s.station_name, year, month )
SELECT
stationId,
stationsname,
MIN(TT_TU) AS minTemperatur,
MAX(TT_TU) AS maxTemperatur,
AVG(TT_TU) AS avgTemperatur,
jahr_val AS jahr,
monat_val AS monat
FROM base_data
GROUP BY stationId, stationsname, jahr_val, monat_val
""" """
df_tm = spark.sql(q_tempmonat) spark.sql(query).cache().createOrReplaceTempView("tempmonat")
df_tm.createOrReplaceTempView("tempmonat")
def rank_temperatures(spark: SparkSession, limit: int, year: int = None):
# 1. Ranking Partitioniert nach Monat im Jahr 2015 """Musterlösung 'rankMinMaxAvgTemp2015' and 'rankMinMaxAvgTempYears'."""
print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...") where_clause = f"WHERE YEAR(jahr) = {year}" if year else ""
q_rank_2015 = """ query = f"""
SELECT SELECT stationid, stationsname, monat, minTemperatur,
month, station_name, min_t, RANK() OVER (ORDER BY minTemperatur ASC) AS rangMIN,
RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min, maxTemperatur,
RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max, RANK() OVER (ORDER BY maxTemperatur DESC) AS rangMAX,
RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg avgTemperatur,
FROM tempmonat RANK() OVER (ORDER BY avgTemperatur DESC) AS rangAVG
WHERE year = '2015' FROM tempmonat
ORDER BY rank_min, month {where_clause}
ORDER BY rangMIN
""" """
spark.sql(q_rank_2015).show(10) spark.sql(query).show(limit, truncate=False)
# 2. Globales Ranking (über alle Monate/Jahre hinweg) # --- Aufgabe C: Grouping Sets ---
print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...")
q_rank_global = """ def create_grouping_sets_view(spark: SparkSession):
"""Computes grouping sets using a CTE to avoid Missing Aggregation errors."""
query = """
WITH base_gs AS (
SELECT SELECT
year, month, station_name, min_t, d.stationId,
RANK() OVER (ORDER BY min_t ASC) as rank_min, gs.bundesland,
RANK() OVER (ORDER BY max_t ASC) as rank_max, d.TT_TU,
RANK() OVER (ORDER BY avg_t ASC) as rank_avg YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS jahr_val,
FROM tempmonat CONCAT(SUBSTR(d.date, 7, 2), '-', SUBSTR(d.date, 5, 2)) AS monat_val
ORDER BY rank_min
"""
spark.sql(q_rank_global).show(10)
print(f"Dauer 11b: {time.time() - start:.2f}s")
input(">> 11b: Check Spark UI (Jobs/Stages). Enter...")
def task_11c_groupingsets(spark: SparkSession):
print("\n--- Aufgabe 11c: Grouping Sets ---")
start = time.time()
q_prep = """
SELECT
CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year,
CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month,
s.station_name,
s.bundesland,
d.TT_TU
FROM german_stations_data d FROM german_stations_data d
JOIN german_stations s ON d.stationId = s.stationId JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.TT_TU > -50 WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
)
SELECT
stationId,
bundesland,
jahr_val AS jahr,
monat_val AS monat,
MIN(TT_TU) AS minTemperatur,
MAX(TT_TU) AS maxTemperatur,
AVG(TT_TU) AS avgTemperatur
FROM base_gs
GROUP BY GROUPING SETS (
(bundesland, jahr_val),
(stationId, jahr_val),
(bundesland, monat_val)
)
""" """
spark.sql(q_prep).createOrReplaceTempView("gs_base") spark.sql(query).cache().createOrReplaceTempView("tempmma_gs")
q_sets = """ def show_seperate_gs(spark: SparkSession, limit: int, metric: str):
SELECT """Musterlösung 'showMinMaxAvgSeperate'."""
year, aggs = [
month, ("bundesland", "jahr"),
bundesland, ("stationId", "jahr"),
station_name, ("bundesland", "monat")
MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t ]
FROM gs_base for col1, col2 in aggs:
GROUP BY GROUPING SETS ( print(f"Aggregation: {col1} & {col2}")
(year, bundesland), q = f"SELECT {col1}, {col2}, {metric} FROM tempmma_gs WHERE {col1} IS NOT NULL AND {col2} IS NOT NULL ORDER BY {metric}"
(year, station_name), spark.sql(q).show(limit, truncate=False)
(month, bundesland)
)
"""
df_gs = spark.sql(q_sets)
df_gs.cache()
df_gs.createOrReplaceTempView("grouping_result")
# Action zum Cachen
df_gs.count()
print(f"Grouping Sets berechnet. Dauer: {time.time() - start:.2f}s")
print("Auswahl 1: Jahr & Bundesland")
spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5)
print("Auswahl 2: Jahr & Station")
spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5)
print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)")
spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5)
input(">> 11c: Check Spark UI (Zugriffspläne/Storage). Enter...")
# ---------------------------------------------------------
# AUFGABE 12
# ---------------------------------------------------------
def task_12_stocks_analysis(spark: SparkSession):
print("\n--- Aufgabe 12: Stocks & Portfolio ---")
# a) Erstes und letztes Datum je Symbol
print("a) Min/Max Datum pro Symbol")
t0 = time.time()
q_a = """
SELECT symbol, MIN(date) as first_date, MAX(date) as last_date
FROM stocks
GROUP BY symbol
ORDER BY symbol
"""
spark.sql(q_a).show(5)
print(f"Zeit a): {time.time()-t0:.2f}s")
# b) Aggregationen 2009
print("\nb) High/Low/Avg Close 2009")
t0 = time.time()
q_b = """
SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close
FROM stocks
WHERE YEAR(date) = 2009
GROUP BY symbol
ORDER BY symbol
"""
spark.sql(q_b).show(5)
print(f"Zeit b): {time.time()-t0:.2f}s")
# c) Lateral View (Explode Portfolio)
print("\nc) Lateral View: Aktien in Portfolios")
t0 = time.time()
q_c = """
SELECT
h.symbol,
SUM(h.amount) as total_shares,
COUNT(p.portfolioId) as num_portfolios,
AVG(h.amount) as avg_per_portfolio
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
GROUP BY h.symbol
ORDER BY h.symbol
"""
spark.sql(q_c).show(5)
print(f"Zeit c): {time.time()-t0:.2f}s")
# d) Symbole in keinem Portfolio (Anti Join)
print("\nd) Symbole ohne Portfolio")
t0 = time.time()
q_d = """
SELECT DISTINCT s.symbol
FROM stocks s
LEFT ANTI JOIN (
SELECT DISTINCT h.symbol
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
) p_sym ON s.symbol = p_sym.symbol
ORDER BY s.symbol
"""
spark.sql(q_d).show(5)
print(f"Zeit d): {time.time()-t0:.2f}s")
input(">> 12 a-d fertig. Check UI. Enter für e)...")
# e) Portfolio Wert Ende 2010
print("\ne) Portfolio Bewertung Ende 2010")
t0 = time.time()
q_last_price = """
SELECT symbol, close
FROM (
SELECT
symbol,
close,
ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn
FROM stocks
WHERE YEAR(date) = 2010
) tmp
WHERE rn = 1
"""
spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end")
# Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren
q_val = """
SELECT
p.portfolioId,
SUM(h.amount * s.close) as portfolio_value_2010
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
JOIN stocks_2010_end s ON h.symbol = s.symbol
GROUP BY p.portfolioId
ORDER BY p.portfolioId
"""
spark.sql(q_val).show(5)
print(f"Zeit e): {time.time()-t0:.2f}s")
# --- Execution ---
def main(scon, spark): def main(scon, spark):
init_view_stations(spark) read_parquet_tables(spark)
# Aufgabe 11 # Kempten ID = 2559
task_11a_rollup(spark, station_name="Kempten") create_mma_rollup(spark, 2559)
task_11b_rank(spark) for level in ["years", "quartals", "months", "days"]:
task_11c_groupingsets(spark) plot_date_values(spark, level)
create_tempmonat(spark)
print("Rangfolgen 2015:")
rank_temperatures(spark, 18, 2015)
print("Rangfolgen Gesamt:")
rank_temperatures(spark, 18)
create_grouping_sets_view(spark)
show_seperate_gs(spark, 5, "minTemperatur")
# Aufgabe 12 if __name__ == "__main__":
init_view_stocks(spark)
task_12_stocks_analysis(spark)
if __name__ == '__main__':
main(scon, spark) main(scon, spark)

276
Aufgabe 12/Aufgabe12.py Normal file
View File

@@ -0,0 +1,276 @@
from __future__ import annotations
from typing import Iterable, Sequence
from pyspark.sql import SparkSession, functions as F, types as T
from sparkstart import scon, spark
HDFSPATH = "hdfs://193.174.205.250:54310/"
_DATE_FALLBACK_EXPR = "COALESCE(date_value, TO_DATE(date_str), TO_DATE(date_str, 'yyyyMMdd'))"
def _resolve_column_name(columns: Sequence[str], candidates: Iterable[str]) -> str:
lowered = {col.lower(): col for col in columns}
for candidate in candidates:
match = lowered.get(candidate.lower())
if match:
return match
raise ValueError(f"None of the candidate columns {list(candidates)} exist in {columns}")
def _normalize_stocks_view(spark: SparkSession) -> None:
stocks_path = HDFSPATH + "stocks/stocks.parquet"
stocks_df = spark.read.parquet(stocks_path)
symbol_col = _resolve_column_name(stocks_df.columns, ("symbol", "ticker"))
date_col = _resolve_column_name(stocks_df.columns, ("date", "pricedate", "dt"))
close_col = _resolve_column_name(stocks_df.columns, ("close", "closeprice", "closingprice"))
stocks_df = (
stocks_df
.select(
F.col(symbol_col).alias("symbol"),
F.col(date_col).alias("raw_date"),
F.col(close_col).alias("close_raw"),
)
.withColumn("date_str", F.col("raw_date").cast("string"))
)
date_candidates = [
F.col("raw_date").cast("date"),
F.to_date("raw_date"),
F.to_date("date_str"),
F.to_date("date_str", "yyyyMMdd"),
F.to_date("date_str", "MM/dd/yyyy"),
]
stocks_df = (
stocks_df
.withColumn("date_value", F.coalesce(*date_candidates))
.withColumn("year_value", F.substring("date_str", 1, 4).cast("int"))
.withColumn("close_value", F.col("close_raw").cast("double"))
.select("symbol", "date_value", "date_str", "year_value", "close_value")
)
stocks_df.cache()
stocks_df.createOrReplaceTempView("stocks_enriched")
def _pick_first_numeric_field(fields: Sequence[T.StructField]) -> str:
numeric_types = (
T.ByteType,
T.ShortType,
T.IntegerType,
T.LongType,
T.FloatType,
T.DoubleType,
T.DecimalType,
)
for field in fields:
if isinstance(field.dataType, numeric_types):
return field.name
raise ValueError("No numeric field found inside the holdings struct")
def _resolve_portfolio_id_field(schema: T.StructType) -> str:
priority = ("portfolio_id", "portfolioid", "id")
lowered = {field.name.lower(): field.name for field in schema.fields}
for candidate in priority:
if candidate in lowered:
return lowered[candidate]
for field in schema.fields:
if not isinstance(field.dataType, (T.ArrayType, T.MapType)):
return field.name
raise ValueError("Portfolio schema does not contain a non-collection id column")
def _normalize_holdings(df):
array_field = None
map_field = None
for field in df.schema.fields:
if isinstance(field.dataType, T.ArrayType) and isinstance(field.dataType.elementType, T.StructType):
array_field = field
break
if isinstance(field.dataType, T.MapType) and isinstance(field.dataType.keyType, T.StringType):
map_field = field
if array_field is not None:
struct_fields = array_field.dataType.elementType.fields
symbol_field = _resolve_column_name([f.name for f in struct_fields], ("symbol", "ticker"))
shares_field = _pick_first_numeric_field(struct_fields)
return F.expr(
f"transform(`{array_field.name}`, x -> named_struct('symbol', x.`{symbol_field}`, 'shares', CAST(x.`{shares_field}` AS DOUBLE)))"
)
if map_field is not None and isinstance(map_field.dataType.valueType, (T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType)):
return F.expr(
f"transform(map_entries(`{map_field.name}`), x -> named_struct('symbol', x.key, 'shares', CAST(x.value AS DOUBLE)))"
)
raise ValueError("Could not locate holdings column (array<struct> or map) in portfolio data")
def _normalize_portfolio_view(spark: SparkSession) -> None:
portfolio_path = HDFSPATH + "stocks/portfolio.parquet"
portfolio_df = spark.read.parquet(portfolio_path)
id_col = _resolve_portfolio_id_field(portfolio_df.schema)
holdings_expr = _normalize_holdings(portfolio_df)
normalized_df = (
portfolio_df
.select(
F.col(id_col).alias("portfolio_id"),
holdings_expr.alias("holdings"),
)
)
normalized_df.cache()
normalized_df.createOrReplaceTempView("portfolio")
spark.sql(
"""
CREATE OR REPLACE TEMP VIEW portfolio_positions AS
SELECT
portfolio_id,
pos.symbol AS symbol,
pos.shares AS shares
FROM portfolio
LATERAL VIEW explode(holdings) exploded AS pos
"""
)
def register_base_views(spark: SparkSession) -> None:
_normalize_stocks_view(spark)
_normalize_portfolio_view(spark)
def query_first_and_last_listing(spark: SparkSession):
q = f"""
SELECT
symbol,
MIN({_DATE_FALLBACK_EXPR}) AS first_listing,
MAX({_DATE_FALLBACK_EXPR}) AS last_listing
FROM stocks_enriched
WHERE symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_close_stats_2009(spark: SparkSession):
q = """
SELECT
symbol,
MAX(close_value) AS max_close,
MIN(close_value) AS min_close,
AVG(close_value) AS avg_close
FROM stocks_enriched
WHERE year_value = 2009 AND close_value IS NOT NULL AND symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_portfolio_symbol_stats(spark: SparkSession):
q = """
SELECT
symbol,
SUM(shares) AS total_shares,
COUNT(DISTINCT portfolio_id) AS portfolio_count,
AVG(shares) AS avg_shares_per_portfolio
FROM portfolio_positions
WHERE symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_symbols_missing_in_portfolios(spark: SparkSession):
q = """
SELECT DISTINCT s.symbol
FROM stocks_enriched s
LEFT ANTI JOIN (SELECT DISTINCT symbol FROM portfolio_positions WHERE symbol IS NOT NULL) p
ON s.symbol = p.symbol
WHERE s.symbol IS NOT NULL
ORDER BY s.symbol
"""
return spark.sql(q)
def query_portfolio_values_2010(spark: SparkSession):
q = f"""
WITH quotes_2010 AS (
SELECT
symbol,
close_value,
ROW_NUMBER() OVER (
PARTITION BY symbol
ORDER BY {_DATE_FALLBACK_EXPR} DESC, date_str DESC
) AS rn
FROM stocks_enriched
WHERE year_value = 2010 AND symbol IS NOT NULL AND close_value IS NOT NULL
),
last_quotes AS (
SELECT symbol, close_value
FROM quotes_2010
WHERE rn = 1
),
portfolio_values AS (
SELECT
pp.portfolio_id,
SUM(pp.shares * lq.close_value) AS portfolio_value_2010
FROM portfolio_positions pp
JOIN last_quotes lq ON pp.symbol = lq.symbol
GROUP BY pp.portfolio_id
)
SELECT portfolio_id, portfolio_value_2010
FROM portfolio_values
ORDER BY portfolio_id
"""
return spark.sql(q)
def main(scon, spark):
register_base_views(spark)
print("(a) Erste und letzte Notierung je Symbol:")
query_first_and_last_listing(spark).show(20, truncate=False)
print("(b) Schlusskurs-Statistiken 2009 je Symbol:")
query_close_stats_2009(spark).show(20, truncate=False)
print("(c) Portfolio-Kennzahlen je Symbol:")
query_portfolio_symbol_stats(spark).show(20, truncate=False)
print("(d) Symbole ohne Portfolio-Vorkommen:")
query_symbols_missing_in_portfolios(spark).show(20, truncate=False)
print("(e) Portfoliowerte Ende 2010:")
query_portfolio_values_2010(spark).show(20, truncate=False)
if __name__ == "__main__":
main(scon, spark)

22
Aufgabe 12/sparkstart.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()