from __future__ import annotations from typing import Iterable, Sequence from pyspark.sql import SparkSession, functions as F, types as T from sparkstart import scon, spark HDFSPATH = "hdfs://193.174.205.250:54310/" _DATE_FALLBACK_EXPR = "COALESCE(date_value, TO_DATE(date_str), TO_DATE(date_str, 'yyyyMMdd'))" def _resolve_column_name(columns: Sequence[str], candidates: Iterable[str]) -> str: lowered = {col.lower(): col for col in columns} for candidate in candidates: match = lowered.get(candidate.lower()) if match: return match raise ValueError(f"None of the candidate columns {list(candidates)} exist in {columns}") def _normalize_stocks_view(spark: SparkSession) -> None: stocks_path = HDFSPATH + "stocks/stocks.parquet" stocks_df = spark.read.parquet(stocks_path) symbol_col = _resolve_column_name(stocks_df.columns, ("symbol", "ticker")) date_col = _resolve_column_name(stocks_df.columns, ("date", "pricedate", "dt")) close_col = _resolve_column_name(stocks_df.columns, ("close", "closeprice", "closingprice")) stocks_df = ( stocks_df .select( F.col(symbol_col).alias("symbol"), F.col(date_col).alias("raw_date"), F.col(close_col).alias("close_raw"), ) .withColumn("date_str", F.col("raw_date").cast("string")) ) date_candidates = [ F.col("raw_date").cast("date"), F.to_date("raw_date"), F.to_date("date_str"), F.to_date("date_str", "yyyyMMdd"), F.to_date("date_str", "MM/dd/yyyy"), ] stocks_df = ( stocks_df .withColumn("date_value", F.coalesce(*date_candidates)) .withColumn("year_value", F.substring("date_str", 1, 4).cast("int")) .withColumn("close_value", F.col("close_raw").cast("double")) .select("symbol", "date_value", "date_str", "year_value", "close_value") ) stocks_df.cache() stocks_df.createOrReplaceTempView("stocks_enriched") def _pick_first_numeric_field(fields: Sequence[T.StructField]) -> str: numeric_types = ( T.ByteType, T.ShortType, T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType, ) for field in fields: if isinstance(field.dataType, numeric_types): return field.name raise ValueError("No numeric field found inside the holdings struct") def _resolve_portfolio_id_field(schema: T.StructType) -> str: priority = ("portfolio_id", "portfolioid", "id") lowered = {field.name.lower(): field.name for field in schema.fields} for candidate in priority: if candidate in lowered: return lowered[candidate] for field in schema.fields: if not isinstance(field.dataType, (T.ArrayType, T.MapType)): return field.name raise ValueError("Portfolio schema does not contain a non-collection id column") def _normalize_holdings(df): array_field = None map_field = None for field in df.schema.fields: if isinstance(field.dataType, T.ArrayType) and isinstance(field.dataType.elementType, T.StructType): array_field = field break if isinstance(field.dataType, T.MapType) and isinstance(field.dataType.keyType, T.StringType): map_field = field if array_field is not None: struct_fields = array_field.dataType.elementType.fields symbol_field = _resolve_column_name([f.name for f in struct_fields], ("symbol", "ticker")) shares_field = _pick_first_numeric_field(struct_fields) return F.expr( f"transform(`{array_field.name}`, x -> named_struct('symbol', x.`{symbol_field}`, 'shares', CAST(x.`{shares_field}` AS DOUBLE)))" ) if map_field is not None and isinstance(map_field.dataType.valueType, (T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType)): return F.expr( f"transform(map_entries(`{map_field.name}`), x -> named_struct('symbol', x.key, 'shares', CAST(x.value AS DOUBLE)))" ) raise ValueError("Could not locate holdings column (array or map) in portfolio data") def _normalize_portfolio_view(spark: SparkSession) -> None: portfolio_path = HDFSPATH + "stocks/portfolio.parquet" portfolio_df = spark.read.parquet(portfolio_path) id_col = _resolve_portfolio_id_field(portfolio_df.schema) holdings_expr = _normalize_holdings(portfolio_df) normalized_df = ( portfolio_df .select( F.col(id_col).alias("portfolio_id"), holdings_expr.alias("holdings"), ) ) normalized_df.cache() normalized_df.createOrReplaceTempView("portfolio") spark.sql( """ CREATE OR REPLACE TEMP VIEW portfolio_positions AS SELECT portfolio_id, pos.symbol AS symbol, pos.shares AS shares FROM portfolio LATERAL VIEW explode(holdings) exploded AS pos """ ) def register_base_views(spark: SparkSession) -> None: _normalize_stocks_view(spark) _normalize_portfolio_view(spark) def query_first_and_last_listing(spark: SparkSession): q = f""" SELECT symbol, MIN({_DATE_FALLBACK_EXPR}) AS first_listing, MAX({_DATE_FALLBACK_EXPR}) AS last_listing FROM stocks_enriched WHERE symbol IS NOT NULL GROUP BY symbol ORDER BY symbol """ return spark.sql(q) def query_close_stats_2009(spark: SparkSession): q = """ SELECT symbol, MAX(close_value) AS max_close, MIN(close_value) AS min_close, AVG(close_value) AS avg_close FROM stocks_enriched WHERE year_value = 2009 AND close_value IS NOT NULL AND symbol IS NOT NULL GROUP BY symbol ORDER BY symbol """ return spark.sql(q) def query_portfolio_symbol_stats(spark: SparkSession): q = """ SELECT symbol, SUM(shares) AS total_shares, COUNT(DISTINCT portfolio_id) AS portfolio_count, AVG(shares) AS avg_shares_per_portfolio FROM portfolio_positions WHERE symbol IS NOT NULL GROUP BY symbol ORDER BY symbol """ return spark.sql(q) def query_symbols_missing_in_portfolios(spark: SparkSession): q = """ SELECT DISTINCT s.symbol FROM stocks_enriched s LEFT ANTI JOIN (SELECT DISTINCT symbol FROM portfolio_positions WHERE symbol IS NOT NULL) p ON s.symbol = p.symbol WHERE s.symbol IS NOT NULL ORDER BY s.symbol """ return spark.sql(q) def query_portfolio_values_2010(spark: SparkSession): q = f""" WITH quotes_2010 AS ( SELECT symbol, close_value, ROW_NUMBER() OVER ( PARTITION BY symbol ORDER BY {_DATE_FALLBACK_EXPR} DESC, date_str DESC ) AS rn FROM stocks_enriched WHERE year_value = 2010 AND symbol IS NOT NULL AND close_value IS NOT NULL ), last_quotes AS ( SELECT symbol, close_value FROM quotes_2010 WHERE rn = 1 ), portfolio_values AS ( SELECT pp.portfolio_id, SUM(pp.shares * lq.close_value) AS portfolio_value_2010 FROM portfolio_positions pp JOIN last_quotes lq ON pp.symbol = lq.symbol GROUP BY pp.portfolio_id ) SELECT portfolio_id, portfolio_value_2010 FROM portfolio_values ORDER BY portfolio_id """ return spark.sql(q) def main(scon, spark): register_base_views(spark) print("(a) Erste und letzte Notierung je Symbol:") query_first_and_last_listing(spark).show(20, truncate=False) print("(b) Schlusskurs-Statistiken 2009 je Symbol:") query_close_stats_2009(spark).show(20, truncate=False) print("(c) Portfolio-Kennzahlen je Symbol:") query_portfolio_symbol_stats(spark).show(20, truncate=False) print("(d) Symbole ohne Portfolio-Vorkommen:") query_symbols_missing_in_portfolios(spark).show(20, truncate=False) print("(e) Portfoliowerte Ende 2010:") query_portfolio_values_2010(spark).show(20, truncate=False) if __name__ == "__main__": main(scon, spark)