Compare commits

..

5 Commits

Author SHA1 Message Date
622a228fb7 12 2025-12-12 13:21:24 +01:00
d18e9823e5 add Spark configuration setup 2025-12-11 20:51:00 +01:00
2b26188647 Aufgabe11 2025-12-11 20:46:38 +01:00
f89d39d420 Aufgabe 10 2025-12-11 20:39:21 +01:00
de3782d570 fix 2025-12-04 17:42:03 +01:00
6 changed files with 765 additions and 71 deletions

View File

@@ -5,7 +5,6 @@ import matplotlib.pyplot as plt
HDFSPATH = "hdfs://193.174.205.250:54310/"
def read_parquets(spark: SparkSession):
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
@@ -59,86 +58,96 @@ def duration_circle_size(spark: SparkSession):
def compute_daily_and_yearly_frosts(spark: SparkSession):
q_daily_max = (
"SELECT stationId, date, SUBSTR(date,1,4) AS year, MAX(TT_TU) AS max_temp "
"FROM german_stations_data "
"GROUP BY stationId, date"
)
daily_max = spark.sql(q_daily_max)
daily_max.createOrReplaceTempView('daily_max')
q_daily_max = (
"SELECT stationId, date, SUBSTR(CAST(date AS STRING),1,4) AS year, MAX(TT_TU) AS max_temp "
"FROM german_stations_data "
"WHERE TT_TU IS NOT NULL AND TT_TU > -50 AND TT_TU < 60 "
"GROUP BY stationId, date"
)
daily_max = spark.sql(q_daily_max)
daily_max.createOrReplaceTempView('daily_max')
# mark a day as frost if max_temp < 0
q_daily_frost = (
"SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost "
"FROM daily_max"
)
daily_frost = spark.sql(q_daily_frost)
daily_frost.createOrReplaceTempView('daily_frost')
# mark a day as frost if max_temp < 0
q_daily_frost = (
"SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost "
"FROM daily_max"
)
daily_frost = spark.sql(q_daily_frost)
daily_frost.createOrReplaceTempView('daily_frost')
# yearly frostdays per station
q_station_year = (
"SELECT stationId, year, SUM(is_frost) AS frost_days "
"FROM daily_frost GROUP BY stationId, year"
)
station_year_frost = spark.sql(q_station_year)
station_year_frost.createOrReplaceTempView('station_year_frost')
# yearly frostdays per station
q_station_year = (
"SELECT stationId, year, SUM(is_frost) AS frost_days "
"FROM daily_frost GROUP BY stationId, year"
)
station_year_frost = spark.sql(q_station_year)
station_year_frost.createOrReplaceTempView('station_year_frost')
def frost_analysis(spark: SparkSession, year=2024, station_name_matches=('kempten',)):
compute_daily_and_yearly_frosts(spark)
compute_daily_and_yearly_frosts(spark)
q_hist = (
f"SELECT frost_days, COUNT(*) AS station_count "
f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days"
)
hist_df = spark.sql(q_hist)
q_hist = (
f"SELECT frost_days, COUNT(*) AS station_count "
f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days"
)
hist_df = spark.sql(q_hist)
hist_pdf = hist_df.toPandas()
plt.figure(figsize=(8, 5))
plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue')
plt.xlabel('Number of Frost Days in year ' + str(year))
plt.ylabel('Number of Stations')
plt.title(f'Stations vs Frost Days ({year})')
plt.tight_layout()
plt.show()
hist_pdf = hist_df.toPandas()
for name in station_name_matches:
q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'"
ids_df = spark.sql(q_find)
ids = ids_df.collect()
if not ids:
print(f"No stations found matching '{name}'")
continue
for r in ids:
sid = r['stationId']
sname = r['station_name']
print(f"Analyzing stationId={sid} name={sname}")
if hist_pdf.empty:
print(f"No frost data found for year {year}. Trying to find available years...")
q_all = "SELECT frost_days, COUNT(*) AS station_count FROM station_year_frost GROUP BY frost_days ORDER BY frost_days"
hist_pdf = spark.sql(q_all).toPandas()
if hist_pdf.empty:
print("No frost data available at all. Check if TT_TU column contains valid temperature data.")
return
print(f"Found {len(hist_pdf)} frost day categories across all years")
# compute frostdays + 5-yr and 20-yr rolling averages using window frame
q_ts = (
"SELECT year, frost_days, "
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, "
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 "
f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)"
)
ts_df = spark.sql(q_ts)
plt.figure(figsize=(8, 5))
plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue')
plt.xlabel('Number of Frost Days in year ' + str(year))
plt.ylabel('Number of Stations')
plt.title(f'Stations vs Frost Days ({year})')
plt.tight_layout()
plt.show()
pdf = ts_df.toPandas()
if pdf.empty:
print(f"No yearly frost data for station {sid}")
continue
for name in station_name_matches:
q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'"
ids_df = spark.sql(q_find)
ids = ids_df.collect()
if not ids:
print(f"No stations found matching '{name}'")
continue
for r in ids:
sid = r['stationId']
sname = r['station_name']
print(f"Analyzing stationId={sid} name={sname}")
pdf['year'] = pdf['year'].astype(int)
plt.figure(figsize=(10, 5))
plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o')
plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--')
plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':')
plt.xlabel('Year')
plt.ylabel('Frost Days')
plt.title(f'Frost Days over Years for {sname} (station {sid})')
plt.legend()
plt.tight_layout()
plt.show()
q_ts = (
"SELECT year, frost_days, "
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, "
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 "
f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)"
)
ts_df = spark.sql(q_ts)
pdf = ts_df.toPandas()
if pdf.empty:
print(f"No yearly frost data for station {sid}")
continue
pdf['year'] = pdf['year'].astype(int)
plt.figure(figsize=(10, 5))
plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o')
plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--')
plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':')
plt.xlabel('Year')
plt.ylabel('Frost Days')
plt.title(f'Frost Days over Years for {sname} (station {sid})')
plt.legend()
plt.tight_layout()
plt.show()
def height_frost_correlation(spark: SparkSession):

366
Aufgabe 11/Aufgabe11.py Normal file
View File

@@ -0,0 +1,366 @@
from __future__ import annotations
from sparkstart import scon, spark
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
HDFSPATH = "hdfs://193.174.205.250:54310/"
def read_parquet_tables(spark: SparkSession) -> None:
"""Load station master data and hourly measurements from parquet if needed."""
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
stations_df = spark.read.parquet(stations_path)
stations_df.createOrReplaceTempView("german_stations")
stations_df.cache()
products_df = spark.read.parquet(products_path)
products_df.createOrReplaceTempView("german_stations_data")
products_df.cache()
def _escape_like(value: str) -> str:
"""Escape single quotes for safe SQL literal usage."""
return value.replace("'", "''")
def resolve_station_id(spark: SparkSession, station_identifier) -> int:
"""Resolve station id either from int input or fuzzy name search."""
if isinstance(station_identifier, int):
return station_identifier
if isinstance(station_identifier, str) and station_identifier.strip().isdigit():
return int(station_identifier.strip())
if isinstance(station_identifier, str):
needle = _escape_like(station_identifier.lower())
q = (
"SELECT stationId FROM german_stations "
f"WHERE lower(station_name) LIKE '%{needle}%' ORDER BY station_name LIMIT 1"
)
result = spark.sql(q).collect()
if not result:
raise ValueError(f"No station found for pattern '{station_identifier}'")
return int(result[0]["stationId"])
raise ValueError("station_identifier must be int or str")
def build_station_rollup_for_station(spark: SparkSession, station_identifier) -> None:
"""Create rollup view with min/max/avg per hour/day/month/quarter/year."""
station_id = resolve_station_id(spark, station_identifier)
q = f"""
WITH base AS (
SELECT
d.stationId,
gs.station_name,
TO_TIMESTAMP(CONCAT(d.date, LPAD(CAST(d.hour AS STRING), 2, '0')), 'yyyyMMddHH') AS hour_ts,
TO_DATE(d.date, 'yyyyMMdd') AS day_date,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_in_year,
QUARTER(TO_DATE(d.date, 'yyyyMMdd')) AS quarter_in_year,
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value,
d.TT_TU AS temperature
FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.stationId = {station_id}
AND d.TT_TU IS NOT NULL
AND d.TT_TU <> -999
),
rollup_base AS (
SELECT
stationId,
station_name,
hour_ts,
day_date,
month_in_year,
quarter_in_year,
year_value,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
AVG(temperature) AS avg_temp
FROM base
GROUP BY stationId, station_name, ROLLUP(year_value, quarter_in_year, month_in_year, day_date, hour_ts)
)
SELECT
stationId,
station_name,
hour_ts,
day_date,
month_in_year,
quarter_in_year,
year_value,
CASE WHEN month_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(month_in_year AS STRING), 2, '0'), '-01')) END AS month_start_date,
CASE WHEN quarter_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(quarter_in_year * 3 - 2 AS STRING), 2, '0'), '-01')) END AS quarter_start_date,
CASE WHEN year_value IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-01-01')) END AS year_start_date,
min_temp,
max_temp,
avg_temp
FROM rollup_base
"""
rollup_df = spark.sql(q)
rollup_df.cache()
rollup_df.createOrReplaceTempView("station_rollup")
def _year_window(spark: SparkSession, years_back: int, station_id: int) -> tuple[int, int] | None:
stats = spark.sql(
f"SELECT MIN(year_value) AS min_year, MAX(year_value) AS max_year FROM station_rollup WHERE year_value IS NOT NULL AND stationId = {station_id}"
).collect()
if not stats or stats[0]["max_year"] is None:
return None
min_year = int(stats[0]["min_year"])
max_year = int(stats[0]["max_year"])
start_year = max(min_year, max_year - years_back + 1)
return start_year, max_year
def plot_station_rollup_levels(
spark: SparkSession,
station_identifier,
day_span_years: int = 3,
agg_span_years: int = 15,
) -> None:
"""Plot day, month, quarter, and year aggregates for the given station."""
station_id = resolve_station_id(spark, station_identifier)
needs_refresh = not spark.catalog.tableExists("station_rollup")
if not needs_refresh:
count = spark.sql(
f"SELECT COUNT(*) AS cnt FROM station_rollup WHERE stationId = {station_id}"
).collect()[0]["cnt"]
needs_refresh = count == 0
if needs_refresh:
build_station_rollup_for_station(spark, station_id)
day_window = _year_window(spark, day_span_years, station_id)
if day_window is None:
print("No data available for plotting")
return
month_window = _year_window(spark, agg_span_years, station_id)
if month_window is None:
print("No aggregated window available")
return
def _plot(query: str, figure_idx: int, title: str, x_col: str = "bucket_date") -> None:
pdf = spark.sql(query).toPandas()
if pdf.empty:
print(f"No data for {title}")
return
plt.figure(num=figure_idx)
plt.clf()
metrics = [
("min_temp", "Min", "#1f77b4"),
("avg_temp", "Avg", "#ff7f0e"),
("max_temp", "Max", "#2ca02c"),
]
for col, label, color in metrics:
if col in pdf:
plt.plot(pdf[x_col], pdf[col], label=label, color=color)
plt.title(title)
plt.xlabel("Datum")
plt.ylabel("Temperatur (°C)")
plt.legend()
plt.tight_layout()
plt.show()
day_start, day_end = day_window
q_day = f"""
SELECT day_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND hour_ts IS NULL
AND day_date IS NOT NULL
AND year_value BETWEEN {day_start} AND {day_end}
ORDER BY bucket_date
"""
_plot(q_day, 1, f"Tagesmittelwerte {day_start}-{day_end}")
agg_start, agg_end = month_window
q_month = f"""
SELECT month_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND day_date IS NULL
AND month_in_year IS NOT NULL
AND year_value BETWEEN {agg_start} AND {agg_end}
ORDER BY bucket_date
"""
_plot(q_month, 2, f"Monatsmittelwerte {agg_start}-{agg_end}")
q_quarter = f"""
SELECT quarter_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND month_in_year IS NULL
AND quarter_in_year IS NOT NULL
AND year_value BETWEEN {agg_start} AND {agg_end}
ORDER BY bucket_date
"""
_plot(q_quarter, 3, f"Quartalsmittelwerte {agg_start}-{agg_end}")
q_year = f"""
SELECT year_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND quarter_in_year IS NULL
AND year_value IS NOT NULL
ORDER BY bucket_date
"""
_plot(q_year, 4, "Jahresmittelwerte")
def create_tempmonat(spark: SparkSession) -> None:
"""Create cached temp table tempmonat with monthly aggregates per station."""
q = """
SELECT
d.stationId,
gs.station_name,
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value,
MIN(d.TT_TU) AS min_temp,
MAX(d.TT_TU) AS max_temp,
AVG(d.TT_TU) AS avg_temp
FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
GROUP BY d.stationId, gs.station_name, YEAR(TO_DATE(d.date, 'yyyyMMdd')), MONTH(TO_DATE(d.date, 'yyyyMMdd'))
"""
monthly_df = spark.sql(q)
monthly_df.cache()
monthly_df.createOrReplaceTempView("tempmonat")
def rank_coldest_per_month_2015(spark: SparkSession):
"""Rank stations by coldest values per month for 2015 using tempmonat."""
return spark.sql(
"""
SELECT
stationId,
station_name,
year_value,
month_value,
min_temp,
max_temp,
avg_temp,
RANK() OVER (PARTITION BY month_value ORDER BY min_temp ASC) AS rank_min,
RANK() OVER (PARTITION BY month_value ORDER BY max_temp ASC) AS rank_max,
RANK() OVER (PARTITION BY month_value ORDER BY avg_temp ASC) AS rank_avg
FROM tempmonat
WHERE year_value = 2015
ORDER BY rank_min, month_value
"""
)
def rank_coldest_overall(spark: SparkSession):
"""Rank stations by coldest values over all months/years (no partition)."""
return spark.sql(
"""
SELECT
stationId,
station_name,
year_value,
month_value,
min_temp,
max_temp,
avg_temp,
RANK() OVER (ORDER BY min_temp ASC) AS rank_min,
RANK() OVER (ORDER BY max_temp ASC) AS rank_max,
RANK() OVER (ORDER BY avg_temp ASC) AS rank_avg
FROM tempmonat
ORDER BY rank_min
"""
)
def create_grouping_sets_overview(spark: SparkSession) -> None:
"""Compute grouping sets for requested aggregations and cache the result."""
q = """
WITH base AS (
SELECT
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value,
gs.bundesland,
gs.stationId,
gs.station_name,
d.TT_TU AS temperature
FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
)
SELECT
year_value,
month_value,
bundesland,
stationId,
station_name,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
AVG(temperature) AS avg_temp
FROM base
GROUP BY GROUPING SETS (
(year_value, bundesland),
(year_value, stationId, station_name, bundesland),
(month_value, bundesland)
)
"""
grouped_df = spark.sql(q)
grouped_df.cache()
grouped_df.createOrReplaceTempView("grouping_sets_stats")
def select_year_bundesland(spark: SparkSession):
return spark.sql(
"""
SELECT year_value, bundesland, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE bundesland IS NOT NULL AND month_value IS NULL AND stationId IS NULL
ORDER BY year_value, bundesland
"""
)
def select_year_station(spark: SparkSession):
return spark.sql(
"""
SELECT year_value, stationId, station_name, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE stationId IS NOT NULL AND month_value IS NULL
ORDER BY year_value, stationId
"""
)
def select_month_bundesland(spark: SparkSession):
return spark.sql(
"""
SELECT month_value, bundesland, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE month_value IS NOT NULL AND year_value IS NULL
ORDER BY month_value, bundesland
"""
)
def main(scon, spark):
read_parquet_tables(spark)
build_station_rollup_for_station(spark, "kempten")
plot_station_rollup_levels(spark, "kempten")
create_tempmonat(spark)
print("Rangfolgen 2015 je Monat:")
rank_coldest_per_month_2015(spark).show(36, truncate=False)
print("Rangfolgen gesamt:")
rank_coldest_overall(spark).show(36, truncate=False)
create_grouping_sets_overview(spark)
print("Jahr vs Bundesland:")
select_year_bundesland(spark).show(20, truncate=False)
print("Jahr vs Station:")
select_year_station(spark).show(20, truncate=False)
print("Monat vs Bundesland:")
select_month_bundesland(spark).show(20, truncate=False)
if __name__ == "__main__":
main(scon, spark)

22
Aufgabe 11/sparkstart.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()

276
Aufgabe 12/Aufgabe12.py Normal file
View File

@@ -0,0 +1,276 @@
from __future__ import annotations
from typing import Iterable, Sequence
from pyspark.sql import SparkSession, functions as F, types as T
from sparkstart import scon, spark
HDFSPATH = "hdfs://193.174.205.250:54310/"
_DATE_FALLBACK_EXPR = "COALESCE(date_value, TO_DATE(date_str), TO_DATE(date_str, 'yyyyMMdd'))"
def _resolve_column_name(columns: Sequence[str], candidates: Iterable[str]) -> str:
lowered = {col.lower(): col for col in columns}
for candidate in candidates:
match = lowered.get(candidate.lower())
if match:
return match
raise ValueError(f"None of the candidate columns {list(candidates)} exist in {columns}")
def _normalize_stocks_view(spark: SparkSession) -> None:
stocks_path = HDFSPATH + "stocks/stocks.parquet"
stocks_df = spark.read.parquet(stocks_path)
symbol_col = _resolve_column_name(stocks_df.columns, ("symbol", "ticker"))
date_col = _resolve_column_name(stocks_df.columns, ("date", "pricedate", "dt"))
close_col = _resolve_column_name(stocks_df.columns, ("close", "closeprice", "closingprice"))
stocks_df = (
stocks_df
.select(
F.col(symbol_col).alias("symbol"),
F.col(date_col).alias("raw_date"),
F.col(close_col).alias("close_raw"),
)
.withColumn("date_str", F.col("raw_date").cast("string"))
)
date_candidates = [
F.col("raw_date").cast("date"),
F.to_date("raw_date"),
F.to_date("date_str"),
F.to_date("date_str", "yyyyMMdd"),
F.to_date("date_str", "MM/dd/yyyy"),
]
stocks_df = (
stocks_df
.withColumn("date_value", F.coalesce(*date_candidates))
.withColumn("year_value", F.substring("date_str", 1, 4).cast("int"))
.withColumn("close_value", F.col("close_raw").cast("double"))
.select("symbol", "date_value", "date_str", "year_value", "close_value")
)
stocks_df.cache()
stocks_df.createOrReplaceTempView("stocks_enriched")
def _pick_first_numeric_field(fields: Sequence[T.StructField]) -> str:
numeric_types = (
T.ByteType,
T.ShortType,
T.IntegerType,
T.LongType,
T.FloatType,
T.DoubleType,
T.DecimalType,
)
for field in fields:
if isinstance(field.dataType, numeric_types):
return field.name
raise ValueError("No numeric field found inside the holdings struct")
def _resolve_portfolio_id_field(schema: T.StructType) -> str:
priority = ("portfolio_id", "portfolioid", "id")
lowered = {field.name.lower(): field.name for field in schema.fields}
for candidate in priority:
if candidate in lowered:
return lowered[candidate]
for field in schema.fields:
if not isinstance(field.dataType, (T.ArrayType, T.MapType)):
return field.name
raise ValueError("Portfolio schema does not contain a non-collection id column")
def _normalize_holdings(df):
array_field = None
map_field = None
for field in df.schema.fields:
if isinstance(field.dataType, T.ArrayType) and isinstance(field.dataType.elementType, T.StructType):
array_field = field
break
if isinstance(field.dataType, T.MapType) and isinstance(field.dataType.keyType, T.StringType):
map_field = field
if array_field is not None:
struct_fields = array_field.dataType.elementType.fields
symbol_field = _resolve_column_name([f.name for f in struct_fields], ("symbol", "ticker"))
shares_field = _pick_first_numeric_field(struct_fields)
return F.expr(
f"transform(`{array_field.name}`, x -> named_struct('symbol', x.`{symbol_field}`, 'shares', CAST(x.`{shares_field}` AS DOUBLE)))"
)
if map_field is not None and isinstance(map_field.dataType.valueType, (T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType)):
return F.expr(
f"transform(map_entries(`{map_field.name}`), x -> named_struct('symbol', x.key, 'shares', CAST(x.value AS DOUBLE)))"
)
raise ValueError("Could not locate holdings column (array<struct> or map) in portfolio data")
def _normalize_portfolio_view(spark: SparkSession) -> None:
portfolio_path = HDFSPATH + "stocks/portfolio.parquet"
portfolio_df = spark.read.parquet(portfolio_path)
id_col = _resolve_portfolio_id_field(portfolio_df.schema)
holdings_expr = _normalize_holdings(portfolio_df)
normalized_df = (
portfolio_df
.select(
F.col(id_col).alias("portfolio_id"),
holdings_expr.alias("holdings"),
)
)
normalized_df.cache()
normalized_df.createOrReplaceTempView("portfolio")
spark.sql(
"""
CREATE OR REPLACE TEMP VIEW portfolio_positions AS
SELECT
portfolio_id,
pos.symbol AS symbol,
pos.shares AS shares
FROM portfolio
LATERAL VIEW explode(holdings) exploded AS pos
"""
)
def register_base_views(spark: SparkSession) -> None:
_normalize_stocks_view(spark)
_normalize_portfolio_view(spark)
def query_first_and_last_listing(spark: SparkSession):
q = f"""
SELECT
symbol,
MIN({_DATE_FALLBACK_EXPR}) AS first_listing,
MAX({_DATE_FALLBACK_EXPR}) AS last_listing
FROM stocks_enriched
WHERE symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_close_stats_2009(spark: SparkSession):
q = """
SELECT
symbol,
MAX(close_value) AS max_close,
MIN(close_value) AS min_close,
AVG(close_value) AS avg_close
FROM stocks_enriched
WHERE year_value = 2009 AND close_value IS NOT NULL AND symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_portfolio_symbol_stats(spark: SparkSession):
q = """
SELECT
symbol,
SUM(shares) AS total_shares,
COUNT(DISTINCT portfolio_id) AS portfolio_count,
AVG(shares) AS avg_shares_per_portfolio
FROM portfolio_positions
WHERE symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_symbols_missing_in_portfolios(spark: SparkSession):
q = """
SELECT DISTINCT s.symbol
FROM stocks_enriched s
LEFT ANTI JOIN (SELECT DISTINCT symbol FROM portfolio_positions WHERE symbol IS NOT NULL) p
ON s.symbol = p.symbol
WHERE s.symbol IS NOT NULL
ORDER BY s.symbol
"""
return spark.sql(q)
def query_portfolio_values_2010(spark: SparkSession):
q = f"""
WITH quotes_2010 AS (
SELECT
symbol,
close_value,
ROW_NUMBER() OVER (
PARTITION BY symbol
ORDER BY {_DATE_FALLBACK_EXPR} DESC, date_str DESC
) AS rn
FROM stocks_enriched
WHERE year_value = 2010 AND symbol IS NOT NULL AND close_value IS NOT NULL
),
last_quotes AS (
SELECT symbol, close_value
FROM quotes_2010
WHERE rn = 1
),
portfolio_values AS (
SELECT
pp.portfolio_id,
SUM(pp.shares * lq.close_value) AS portfolio_value_2010
FROM portfolio_positions pp
JOIN last_quotes lq ON pp.symbol = lq.symbol
GROUP BY pp.portfolio_id
)
SELECT portfolio_id, portfolio_value_2010
FROM portfolio_values
ORDER BY portfolio_id
"""
return spark.sql(q)
def main(scon, spark):
register_base_views(spark)
print("(a) Erste und letzte Notierung je Symbol:")
query_first_and_last_listing(spark).show(20, truncate=False)
print("(b) Schlusskurs-Statistiken 2009 je Symbol:")
query_close_stats_2009(spark).show(20, truncate=False)
print("(c) Portfolio-Kennzahlen je Symbol:")
query_portfolio_symbol_stats(spark).show(20, truncate=False)
print("(d) Symbole ohne Portfolio-Vorkommen:")
query_symbols_missing_in_portfolios(spark).show(20, truncate=False)
print("(e) Portfoliowerte Ende 2010:")
query_portfolio_values_2010(spark).show(20, truncate=False)
if __name__ == "__main__":
main(scon, spark)

22
Aufgabe 12/sparkstart.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()

View File

@@ -145,7 +145,6 @@ def import_produkt_files(spark: SparkSession, scon: SparkContext, path='/data/cd
)
def read_product_data_from_parquet(spark):
"""
read_product_data_from_parquet(spark)