diff --git a/Aufgabe 10/Aufgabe10.py b/Aufgabe 10/Aufgabe10.py index 0aa3b52..d7763f9 100644 --- a/Aufgabe 10/Aufgabe10.py +++ b/Aufgabe 10/Aufgabe10.py @@ -5,7 +5,6 @@ import matplotlib.pyplot as plt HDFSPATH = "hdfs://193.174.205.250:54310/" - def read_parquets(spark: SparkSession): stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" diff --git a/Aufgabe 11/Aufgabe11.py b/Aufgabe 11/Aufgabe11.py index 82631cf..569050e 100644 --- a/Aufgabe 11/Aufgabe11.py +++ b/Aufgabe 11/Aufgabe11.py @@ -1,366 +1,187 @@ +from __future__ import annotations from sparkstart import scon, spark from pyspark.sql import SparkSession -import time import matplotlib.pyplot as plt -import pandas as pd -HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/" -HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/" +HDFSPATH = "hdfs://193.174.205.250:54310/" -def init_view_stations(spark): - """Lädt die Stationsdaten für Aufgabe 11""" - s_path = HDFSPATH_STATIONS + "german_stations.parquet" - d_path = HDFSPATH_STATIONS + "german_stations_data.parquet" - - spark.read.parquet(s_path).createOrReplaceTempView("german_stations") - spark.read.parquet(d_path).createOrReplaceTempView("german_stations_data") +def read_parquet_tables(spark: SparkSession) -> None: + # Use your specific paths from Aufgabe 9 + stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" + products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" -def init_view_stocks(spark): - """Lädt die Stocks-Daten für Aufgabe 12""" - # Hinweis: Pfade anpassen, falls sie im Cluster anders liegen - spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks") - spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio") + spark.read.parquet(stations_path).createOrReplaceTempView("german_stations") + spark.read.parquet(products_path).createOrReplaceTempView("german_stations_data") -# --------------------------------------------------------- -# AUFGABE 11 -# --------------------------------------------------------- +# --- Aufgabe A: Rollup and Plotting --- -def task_11a_rollup(spark: SparkSession, station_name="Kempten"): - print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---") - start = time.time() - - # 1. Station ID finden - sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE station_name LIKE '%{station_name}%'") - try: - sid = sid_df.collect()[0]['stationId'] - except IndexError: - print(f"Station {station_name} nicht gefunden.") - return - - # 2. Rollup Query vorbereiten - q_prep = f""" - SELECT - YEAR(date) as yr, - QUARTER(date) as qt, - MONTH(date) as mo, - DAY(date) as da, - TT_TU - FROM german_stations_data - WHERE stationId = {sid} AND TT_TU IS NOT NULL AND TT_TU > -50 +def create_mma_rollup(spark: SparkSession, station_id: int): """ - spark.sql(q_prep).createOrReplaceTempView("data_prep") - - q_rollup = """ - SELECT - yr, qt, mo, da, - MIN(TT_TU) as min_temp, - MAX(TT_TU) as max_temp, - AVG(TT_TU) as avg_temp, - - -- Datums-Konstruktion für Plots - DATE(STRING(yr) || '-' || STRING(qt*3-2) || '-01') as qt_date, - MAKE_DATE(yr, mo, 1) as mo_date, - MAKE_DATE(yr, 1, 1) as yr_date, - MAKE_DATE(yr, mo, da) as da_date - - FROM data_prep - GROUP BY ROLLUP(yr, qt, mo, da) + Functionally identical to Musterlösung 'createDataFrame'. + Uses your schema: TT_TU (temp), hour (messtunde), date (string yyyyMMdd). """ - - df_rollup = spark.sql(q_rollup) - df_rollup.cache() - df_rollup.createOrReplaceTempView("station_rollup") - - # Trigger Action for Cache & Time Measurement - count = df_rollup.count() - print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start:.2f}s") - input(">> 11a: Check Spark UI (Stages/Storage) jetzt. Enter für Plots...") - - # Plot 1: Tageswerte (letzte 3 Jahre der Daten) - q_days = """ - SELECT da_date as date, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL - AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup) - ORDER BY date + query = f""" + WITH processed_data AS ( + SELECT + TT_TU, + hour AS messtunde, + TO_DATE(SUBSTR(date, 1, 4), 'yyyy') AS jahr, + TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', LPAD(CAST(QUARTER(TO_DATE(date, 'yyyyMMdd'))*3-2 AS STRING), 2, '0'), '-01')) AS quartal, + TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', SUBSTR(date, 5, 2), '-01')) AS monat, + TO_DATE(date, 'yyyyMMdd') AS tag + FROM german_stations_data + WHERE stationId = {station_id} + AND TT_TU IS NOT NULL + AND TT_TU <> -999 + ) + SELECT + MIN(TT_TU) AS minTemperatur, + MAX(TT_TU) AS maxTemperatur, + AVG(TT_TU) AS avgTemperatur, + jahr, + quartal, + monat, + tag, + messtunde + FROM processed_data + GROUP BY ROLLUP (jahr, quartal, monat, tag, messtunde) + ORDER BY jahr, quartal, monat, tag, messtunde """ - pdf_days = spark.sql(q_days).toPandas() + df = spark.sql(query) + df.createOrReplaceTempView("mmacdcdata") + df.cache() + df.show(10) + +def plot_date_values(spark: SparkSession, level: str): + """Functionally identical plotting logic to Musterlösung 'plotDateValues'.""" + filters = { + "days": "YEAR(jahr) > 2017 AND YEAR(jahr) < 2021 AND messtunde IS NULL AND tag IS NOT NULL", + "months": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NOT NULL", + "quartals": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NOT NULL", + "years": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NULL AND jahr IS NOT NULL" + } + x_col = {"days": "tag", "months": "monat", "quartals": "quartal", "years": "jahr"} - plt.figure(1, figsize=(10, 5)) - plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5) - plt.title(f"{station_name}: Daily Average (Last 3 Years)") - plt.xlabel('Date') - plt.ylabel('Temp °C') - plt.tight_layout() - plt.show() + pdf = spark.sql(f"SELECT * FROM mmacdcdata WHERE {filters[level]}").toPandas() + if pdf.empty: return - # Plot 2: Monatswerte (10-20 Jahre) - q_months = """ - SELECT mo_date as date, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) - ORDER BY date - """ - pdf_months = spark.sql(q_months).toPandas() - - plt.figure(2, figsize=(10, 5)) - plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg') - plt.title(f"{station_name}: Monthly Average (Last 20 Years)") - plt.xlabel('Date') - plt.ylabel('Temp °C') - plt.tight_layout() - plt.show() - - # Plot 3: Quartalswerte - q_quarters = """ - SELECT qt_date as date, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) - ORDER BY date - """ - pdf_quarters = spark.sql(q_quarters).toPandas() - - plt.figure(3, figsize=(10, 5)) - plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg') - plt.title(f"{station_name}: Quarterly Average (Last 20 Years)") - plt.show() - - # Plot 4: Jahreswerte - q_years = """ - SELECT yr_date as date, min_temp, max_temp, avg_temp - FROM station_rollup - WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL - AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) - ORDER BY date - """ - pdf_years = spark.sql(q_years).toPandas() - - plt.figure(4, figsize=(10, 5)) - plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max') - plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg') - plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min') - plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)") + plt.figure(figsize=(10, 5)) + plt.plot(pdf[x_col[level]], pdf["maxTemperatur"], "red", label="Max") + plt.plot(pdf[x_col[level]], pdf["avgTemperatur"], "green", label="Avg") + plt.plot(pdf[x_col[level]], pdf["minTemperatur"], "blue", label="Min") + plt.title(f"{level.capitalize()}werte") plt.legend() + plt.grid(True) plt.show() +# --- Aufgabe B: Tempmonat and Ranking --- -def task_11b_rank(spark: SparkSession): - print("\n--- Aufgabe 11b: TempMonat Ranking ---") - start = time.time() - - q_tempmonat = """ +def create_tempmonat(spark: SparkSession): + """Joins stations and data to create monthly aggregates using a CTE.""" + query = """ + WITH base_data AS ( SELECT d.stationId, - s.station_name, - SUBSTR(CAST(d.date AS STRING), 1, 4) as year, - SUBSTR(CAST(d.date AS STRING), 6, 2) as month, - MIN(d.TT_TU) as min_t, - MAX(d.TT_TU) as max_t, - AVG(d.TT_TU) as avg_t + gs.station_name AS stationsname, + d.TT_TU, + TO_DATE(SUBSTR(d.date, 1, 4), 'yyyy') AS jahr_val, + TO_DATE(CONCAT(SUBSTR(d.date, 1, 4), '-', SUBSTR(d.date, 5, 2), '-01')) AS monat_val FROM german_stations_data d - JOIN german_stations s ON d.stationId = s.stationId - WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50 - GROUP BY d.stationId, s.station_name, year, month + JOIN german_stations gs ON d.stationId = gs.stationId + WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 + ) + SELECT + stationId, + stationsname, + MIN(TT_TU) AS minTemperatur, + MAX(TT_TU) AS maxTemperatur, + AVG(TT_TU) AS avgTemperatur, + jahr_val AS jahr, + monat_val AS monat + FROM base_data + GROUP BY stationId, stationsname, jahr_val, monat_val """ - df_tm = spark.sql(q_tempmonat) - df_tm.createOrReplaceTempView("tempmonat") - - # 1. Ranking Partitioniert nach Monat im Jahr 2015 - print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...") - q_rank_2015 = """ - SELECT - month, station_name, min_t, - RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min, - RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max, - RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg - FROM tempmonat - WHERE year = '2015' - ORDER BY rank_min, month + spark.sql(query).cache().createOrReplaceTempView("tempmonat") + +def rank_temperatures(spark: SparkSession, limit: int, year: int = None): + """Musterlösung 'rankMinMaxAvgTemp2015' and 'rankMinMaxAvgTempYears'.""" + where_clause = f"WHERE YEAR(jahr) = {year}" if year else "" + query = f""" + SELECT stationid, stationsname, monat, minTemperatur, + RANK() OVER (ORDER BY minTemperatur ASC) AS rangMIN, + maxTemperatur, + RANK() OVER (ORDER BY maxTemperatur DESC) AS rangMAX, + avgTemperatur, + RANK() OVER (ORDER BY avgTemperatur DESC) AS rangAVG + FROM tempmonat + {where_clause} + ORDER BY rangMIN """ - spark.sql(q_rank_2015).show(10) + spark.sql(query).show(limit, truncate=False) - # 2. Globales Ranking (über alle Monate/Jahre hinweg) - print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...") - q_rank_global = """ +# --- Aufgabe C: Grouping Sets --- + +def create_grouping_sets_view(spark: SparkSession): + """Computes grouping sets using a CTE to avoid Missing Aggregation errors.""" + query = """ + WITH base_gs AS ( SELECT - year, month, station_name, min_t, - RANK() OVER (ORDER BY min_t ASC) as rank_min, - RANK() OVER (ORDER BY max_t ASC) as rank_max, - RANK() OVER (ORDER BY avg_t ASC) as rank_avg - FROM tempmonat - ORDER BY rank_min - """ - spark.sql(q_rank_global).show(10) - - print(f"Dauer 11b: {time.time() - start:.2f}s") - input(">> 11b: Check Spark UI (Jobs/Stages). Enter...") - - -def task_11c_groupingsets(spark: SparkSession): - print("\n--- Aufgabe 11c: Grouping Sets ---") - start = time.time() - - q_prep = """ - SELECT - CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year, - CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month, - s.station_name, - s.bundesland, - d.TT_TU + d.stationId, + gs.bundesland, + d.TT_TU, + YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS jahr_val, + CONCAT(SUBSTR(d.date, 7, 2), '-', SUBSTR(d.date, 5, 2)) AS monat_val FROM german_stations_data d - JOIN german_stations s ON d.stationId = s.stationId - WHERE d.TT_TU > -50 + JOIN german_stations gs ON d.stationId = gs.stationId + WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999 + ) + SELECT + stationId, + bundesland, + jahr_val AS jahr, + monat_val AS monat, + MIN(TT_TU) AS minTemperatur, + MAX(TT_TU) AS maxTemperatur, + AVG(TT_TU) AS avgTemperatur + FROM base_gs + GROUP BY GROUPING SETS ( + (bundesland, jahr_val), + (stationId, jahr_val), + (bundesland, monat_val) + ) """ - spark.sql(q_prep).createOrReplaceTempView("gs_base") + spark.sql(query).cache().createOrReplaceTempView("tempmma_gs") - q_sets = """ - SELECT - year, - month, - bundesland, - station_name, - MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t - FROM gs_base - GROUP BY GROUPING SETS ( - (year, bundesland), - (year, station_name), - (month, bundesland) - ) - """ - df_gs = spark.sql(q_sets) - df_gs.cache() - df_gs.createOrReplaceTempView("grouping_result") - - # Action zum Cachen - df_gs.count() - print(f"Grouping Sets berechnet. Dauer: {time.time() - start:.2f}s") - - print("Auswahl 1: Jahr & Bundesland") - spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5) - - print("Auswahl 2: Jahr & Station") - spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5) - - print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)") - spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5) - - input(">> 11c: Check Spark UI (Zugriffspläne/Storage). Enter...") - - -# --------------------------------------------------------- -# AUFGABE 12 -# --------------------------------------------------------- - -def task_12_stocks_analysis(spark: SparkSession): - print("\n--- Aufgabe 12: Stocks & Portfolio ---") - - # a) Erstes und letztes Datum je Symbol - print("a) Min/Max Datum pro Symbol") - t0 = time.time() - q_a = """ - SELECT symbol, MIN(date) as first_date, MAX(date) as last_date - FROM stocks - GROUP BY symbol - ORDER BY symbol - """ - spark.sql(q_a).show(5) - print(f"Zeit a): {time.time()-t0:.2f}s") - - # b) Aggregationen 2009 - print("\nb) High/Low/Avg Close 2009") - t0 = time.time() - q_b = """ - SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close - FROM stocks - WHERE YEAR(date) = 2009 - GROUP BY symbol - ORDER BY symbol - """ - spark.sql(q_b).show(5) - print(f"Zeit b): {time.time()-t0:.2f}s") - - # c) Lateral View (Explode Portfolio) - print("\nc) Lateral View: Aktien in Portfolios") - t0 = time.time() - - q_c = """ - SELECT - h.symbol, - SUM(h.amount) as total_shares, - COUNT(p.portfolioId) as num_portfolios, - AVG(h.amount) as avg_per_portfolio - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - GROUP BY h.symbol - ORDER BY h.symbol - """ - spark.sql(q_c).show(5) - print(f"Zeit c): {time.time()-t0:.2f}s") - - # d) Symbole in keinem Portfolio (Anti Join) - print("\nd) Symbole ohne Portfolio") - t0 = time.time() - q_d = """ - SELECT DISTINCT s.symbol - FROM stocks s - LEFT ANTI JOIN ( - SELECT DISTINCT h.symbol - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - ) p_sym ON s.symbol = p_sym.symbol - ORDER BY s.symbol - """ - spark.sql(q_d).show(5) - print(f"Zeit d): {time.time()-t0:.2f}s") - - input(">> 12 a-d fertig. Check UI. Enter für e)...") - - # e) Portfolio Wert Ende 2010 - print("\ne) Portfolio Bewertung Ende 2010") - t0 = time.time() - - q_last_price = """ - SELECT symbol, close - FROM ( - SELECT - symbol, - close, - ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn - FROM stocks - WHERE YEAR(date) = 2010 - ) tmp - WHERE rn = 1 - """ - spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end") - - # Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren - q_val = """ - SELECT - p.portfolioId, - SUM(h.amount * s.close) as portfolio_value_2010 - FROM portfolio p - LATERAL VIEW explode(holdings) t AS h - JOIN stocks_2010_end s ON h.symbol = s.symbol - GROUP BY p.portfolioId - ORDER BY p.portfolioId - """ - spark.sql(q_val).show(5) - print(f"Zeit e): {time.time()-t0:.2f}s") +def show_seperate_gs(spark: SparkSession, limit: int, metric: str): + """Musterlösung 'showMinMaxAvgSeperate'.""" + aggs = [ + ("bundesland", "jahr"), + ("stationId", "jahr"), + ("bundesland", "monat") + ] + for col1, col2 in aggs: + print(f"Aggregation: {col1} & {col2}") + q = f"SELECT {col1}, {col2}, {metric} FROM tempmma_gs WHERE {col1} IS NOT NULL AND {col2} IS NOT NULL ORDER BY {metric}" + spark.sql(q).show(limit, truncate=False) +# --- Execution --- def main(scon, spark): - init_view_stations(spark) + read_parquet_tables(spark) - # Aufgabe 11 - task_11a_rollup(spark, station_name="Kempten") - task_11b_rank(spark) - task_11c_groupingsets(spark) + # Kempten ID = 2559 + create_mma_rollup(spark, 2559) + for level in ["years", "quartals", "months", "days"]: + plot_date_values(spark, level) + + create_tempmonat(spark) + print("Rangfolgen 2015:") + rank_temperatures(spark, 18, 2015) + print("Rangfolgen Gesamt:") + rank_temperatures(spark, 18) + + create_grouping_sets_view(spark) + show_seperate_gs(spark, 5, "minTemperatur") - # Aufgabe 12 - init_view_stocks(spark) - task_12_stocks_analysis(spark) - -if __name__ == '__main__': +if __name__ == "__main__": main(scon, spark) \ No newline at end of file diff --git a/Aufgabe 12/Aufgabe12.py b/Aufgabe 12/Aufgabe12.py new file mode 100644 index 0000000..dd9531f --- /dev/null +++ b/Aufgabe 12/Aufgabe12.py @@ -0,0 +1,276 @@ +from __future__ import annotations + +from typing import Iterable, Sequence + +from pyspark.sql import SparkSession, functions as F, types as T + +from sparkstart import scon, spark + + +HDFSPATH = "hdfs://193.174.205.250:54310/" + + +_DATE_FALLBACK_EXPR = "COALESCE(date_value, TO_DATE(date_str), TO_DATE(date_str, 'yyyyMMdd'))" + + +def _resolve_column_name(columns: Sequence[str], candidates: Iterable[str]) -> str: + + lowered = {col.lower(): col for col in columns} + for candidate in candidates: + match = lowered.get(candidate.lower()) + if match: + return match + raise ValueError(f"None of the candidate columns {list(candidates)} exist in {columns}") + + +def _normalize_stocks_view(spark: SparkSession) -> None: + + stocks_path = HDFSPATH + "stocks/stocks.parquet" + stocks_df = spark.read.parquet(stocks_path) + + symbol_col = _resolve_column_name(stocks_df.columns, ("symbol", "ticker")) + date_col = _resolve_column_name(stocks_df.columns, ("date", "pricedate", "dt")) + close_col = _resolve_column_name(stocks_df.columns, ("close", "closeprice", "closingprice")) + + stocks_df = ( + stocks_df + .select( + F.col(symbol_col).alias("symbol"), + F.col(date_col).alias("raw_date"), + F.col(close_col).alias("close_raw"), + ) + .withColumn("date_str", F.col("raw_date").cast("string")) + ) + + date_candidates = [ + F.col("raw_date").cast("date"), + F.to_date("raw_date"), + F.to_date("date_str"), + F.to_date("date_str", "yyyyMMdd"), + F.to_date("date_str", "MM/dd/yyyy"), + ] + + stocks_df = ( + stocks_df + .withColumn("date_value", F.coalesce(*date_candidates)) + .withColumn("year_value", F.substring("date_str", 1, 4).cast("int")) + .withColumn("close_value", F.col("close_raw").cast("double")) + .select("symbol", "date_value", "date_str", "year_value", "close_value") + ) + + stocks_df.cache() + stocks_df.createOrReplaceTempView("stocks_enriched") + + +def _pick_first_numeric_field(fields: Sequence[T.StructField]) -> str: + + numeric_types = ( + T.ByteType, + T.ShortType, + T.IntegerType, + T.LongType, + T.FloatType, + T.DoubleType, + T.DecimalType, + ) + for field in fields: + if isinstance(field.dataType, numeric_types): + return field.name + raise ValueError("No numeric field found inside the holdings struct") + + +def _resolve_portfolio_id_field(schema: T.StructType) -> str: + + priority = ("portfolio_id", "portfolioid", "id") + lowered = {field.name.lower(): field.name for field in schema.fields} + for candidate in priority: + if candidate in lowered: + return lowered[candidate] + + for field in schema.fields: + if not isinstance(field.dataType, (T.ArrayType, T.MapType)): + return field.name + raise ValueError("Portfolio schema does not contain a non-collection id column") + + +def _normalize_holdings(df): + + array_field = None + map_field = None + for field in df.schema.fields: + if isinstance(field.dataType, T.ArrayType) and isinstance(field.dataType.elementType, T.StructType): + array_field = field + break + if isinstance(field.dataType, T.MapType) and isinstance(field.dataType.keyType, T.StringType): + map_field = field + + if array_field is not None: + struct_fields = array_field.dataType.elementType.fields + symbol_field = _resolve_column_name([f.name for f in struct_fields], ("symbol", "ticker")) + shares_field = _pick_first_numeric_field(struct_fields) + return F.expr( + f"transform(`{array_field.name}`, x -> named_struct('symbol', x.`{symbol_field}`, 'shares', CAST(x.`{shares_field}` AS DOUBLE)))" + ) + + if map_field is not None and isinstance(map_field.dataType.valueType, (T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType)): + return F.expr( + f"transform(map_entries(`{map_field.name}`), x -> named_struct('symbol', x.key, 'shares', CAST(x.value AS DOUBLE)))" + ) + + raise ValueError("Could not locate holdings column (array or map) in portfolio data") + + +def _normalize_portfolio_view(spark: SparkSession) -> None: + + portfolio_path = HDFSPATH + "stocks/portfolio.parquet" + portfolio_df = spark.read.parquet(portfolio_path) + + id_col = _resolve_portfolio_id_field(portfolio_df.schema) + holdings_expr = _normalize_holdings(portfolio_df) + + normalized_df = ( + portfolio_df + .select( + F.col(id_col).alias("portfolio_id"), + holdings_expr.alias("holdings"), + ) + ) + + normalized_df.cache() + normalized_df.createOrReplaceTempView("portfolio") + + spark.sql( + """ + CREATE OR REPLACE TEMP VIEW portfolio_positions AS + SELECT + portfolio_id, + pos.symbol AS symbol, + pos.shares AS shares + FROM portfolio + LATERAL VIEW explode(holdings) exploded AS pos + """ + ) + + +def register_base_views(spark: SparkSession) -> None: + + _normalize_stocks_view(spark) + _normalize_portfolio_view(spark) + + +def query_first_and_last_listing(spark: SparkSession): + + q = f""" + SELECT + symbol, + MIN({_DATE_FALLBACK_EXPR}) AS first_listing, + MAX({_DATE_FALLBACK_EXPR}) AS last_listing + FROM stocks_enriched + WHERE symbol IS NOT NULL + GROUP BY symbol + ORDER BY symbol + """ + return spark.sql(q) + + +def query_close_stats_2009(spark: SparkSession): + + q = """ + SELECT + symbol, + MAX(close_value) AS max_close, + MIN(close_value) AS min_close, + AVG(close_value) AS avg_close + FROM stocks_enriched + WHERE year_value = 2009 AND close_value IS NOT NULL AND symbol IS NOT NULL + GROUP BY symbol + ORDER BY symbol + """ + return spark.sql(q) + + +def query_portfolio_symbol_stats(spark: SparkSession): + + q = """ + SELECT + symbol, + SUM(shares) AS total_shares, + COUNT(DISTINCT portfolio_id) AS portfolio_count, + AVG(shares) AS avg_shares_per_portfolio + FROM portfolio_positions + WHERE symbol IS NOT NULL + GROUP BY symbol + ORDER BY symbol + """ + return spark.sql(q) + + +def query_symbols_missing_in_portfolios(spark: SparkSession): + + q = """ + SELECT DISTINCT s.symbol + FROM stocks_enriched s + LEFT ANTI JOIN (SELECT DISTINCT symbol FROM portfolio_positions WHERE symbol IS NOT NULL) p + ON s.symbol = p.symbol + WHERE s.symbol IS NOT NULL + ORDER BY s.symbol + """ + return spark.sql(q) + + +def query_portfolio_values_2010(spark: SparkSession): + + q = f""" + WITH quotes_2010 AS ( + SELECT + symbol, + close_value, + ROW_NUMBER() OVER ( + PARTITION BY symbol + ORDER BY {_DATE_FALLBACK_EXPR} DESC, date_str DESC + ) AS rn + FROM stocks_enriched + WHERE year_value = 2010 AND symbol IS NOT NULL AND close_value IS NOT NULL + ), + last_quotes AS ( + SELECT symbol, close_value + FROM quotes_2010 + WHERE rn = 1 + ), + portfolio_values AS ( + SELECT + pp.portfolio_id, + SUM(pp.shares * lq.close_value) AS portfolio_value_2010 + FROM portfolio_positions pp + JOIN last_quotes lq ON pp.symbol = lq.symbol + GROUP BY pp.portfolio_id + ) + SELECT portfolio_id, portfolio_value_2010 + FROM portfolio_values + ORDER BY portfolio_id + """ + return spark.sql(q) + + +def main(scon, spark): + + register_base_views(spark) + + print("(a) Erste und letzte Notierung je Symbol:") + query_first_and_last_listing(spark).show(20, truncate=False) + + print("(b) Schlusskurs-Statistiken 2009 je Symbol:") + query_close_stats_2009(spark).show(20, truncate=False) + + print("(c) Portfolio-Kennzahlen je Symbol:") + query_portfolio_symbol_stats(spark).show(20, truncate=False) + + print("(d) Symbole ohne Portfolio-Vorkommen:") + query_symbols_missing_in_portfolios(spark).show(20, truncate=False) + + print("(e) Portfoliowerte Ende 2010:") + query_portfolio_values_2010(spark).show(20, truncate=False) + + +if __name__ == "__main__": + main(scon, spark) diff --git a/Aufgabe 12/sparkstart.py b/Aufgabe 12/sparkstart.py new file mode 100644 index 0000000..bdb7010 --- /dev/null +++ b/Aufgabe 12/sparkstart.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +""" +Erzeugen einer Spark-Konfiguration +""" + +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession + +# connect to cluster +conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin") +conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +conf.set("spark.executor.memory", '32g') +conf.set("spark.driver.memory", '8g') +conf.set("spark.cores.max", "40") +scon = SparkContext(conf=conf) + + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL") \ + .getOrCreate()