Compare commits

...

1 Commits

Author SHA1 Message Date
1b2de95b2e 12 2025-12-11 21:30:51 +01:00
4 changed files with 229 additions and 175 deletions

View File

@@ -5,7 +5,6 @@ import matplotlib.pyplot as plt
HDFSPATH = "hdfs://193.174.205.250:54310/" HDFSPATH = "hdfs://193.174.205.250:54310/"
def read_parquets(spark: SparkSession): def read_parquets(spark: SparkSession):
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet" stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"

View File

@@ -1,55 +1,57 @@
from sparkstart import scon, spark from sparkstart import scon, spark
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import time
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import pandas as pd import pandas as pd
HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/" HDFSPATH = "hdfs://193.174.205.250:54310/"
HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/" HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
def init_view_stations(spark): def read_parquets(spark: SparkSession):
"""Lädt die Stationsdaten für Aufgabe 11""" stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
s_path = HDFSPATH_STATIONS + "german_stations.parquet" products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
d_path = HDFSPATH_STATIONS + "german_stations_data.parquet"
spark.read.parquet(s_path).createOrReplaceTempView("german_stations")
spark.read.parquet(d_path).createOrReplaceTempView("german_stations_data")
def init_view_stocks(spark): stations_df = spark.read.parquet(stations_path)
"""Lädt die Stocks-Daten für Aufgabe 12""" stations_df.createOrReplaceTempView("german_stations")
# Hinweis: Pfade anpassen, falls sie im Cluster anders liegen
spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks")
spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio")
# --------------------------------------------------------- products_df = spark.read.parquet(products_path)
# AUFGABE 11 products_df.createOrReplaceTempView("german_stations_data")
# ---------------------------------------------------------
stations_df.cache()
products_df.cache()
def task_11a_rollup(spark: SparkSession, station_name="Kempten"): def task_11a_rollup(spark: SparkSession, station_name="Kempten"):
print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---") print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---")
start = time.time() start_time = time.time()
# 1. Station ID finden # 1. Station ID finden
sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE station_name LIKE '%{station_name}%'") # Case-insensitive search
sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE lower(station_name) LIKE '%{station_name.lower()}%'")
try: try:
sid = sid_df.collect()[0]['stationId'] sid = sid_df.collect()[0]['stationId']
print(f"Station found: {station_name} -> ID {sid}")
except IndexError: except IndexError:
print(f"Station {station_name} nicht gefunden.") print(f"Station {station_name} nicht gefunden.")
return return
# 2. Rollup Query vorbereiten # 2. Rollup Query vorbereiten
# FIX: Parse string date 'YYYYMMDD' to real DATE object first
q_prep = f""" q_prep = f"""
SELECT SELECT
YEAR(date) as yr, YEAR(TO_DATE(date, 'yyyyMMdd')) as yr,
QUARTER(date) as qt, QUARTER(TO_DATE(date, 'yyyyMMdd')) as qt,
MONTH(date) as mo, MONTH(TO_DATE(date, 'yyyyMMdd')) as mo,
DAY(date) as da, DAY(TO_DATE(date, 'yyyyMMdd')) as da,
TT_TU TT_TU
FROM german_stations_data FROM german_stations_data
WHERE stationId = {sid} AND TT_TU IS NOT NULL AND TT_TU > -50 WHERE stationId = {sid}
AND TT_TU IS NOT NULL
AND TT_TU > -50
AND TT_TU < 60
""" """
spark.sql(q_prep).createOrReplaceTempView("data_prep") spark.sql(q_prep).createOrReplaceTempView("data_prep")
# 3. Rollup Execution
# Note: We use string construction for quarters/months to ensure we get a valid date string for plotting
q_rollup = """ q_rollup = """
SELECT SELECT
yr, qt, mo, da, yr, qt, mo, da,
@@ -57,10 +59,14 @@ def task_11a_rollup(spark: SparkSession, station_name="Kempten"):
MAX(TT_TU) as max_temp, MAX(TT_TU) as max_temp,
AVG(TT_TU) as avg_temp, AVG(TT_TU) as avg_temp,
-- Datums-Konstruktion für Plots -- Construct dates for plotting (handling the NULLs from ROLLUP)
DATE(STRING(yr) || '-' || STRING(qt*3-2) || '-01') as qt_date, -- For Quarter: Use 1st month of quarter
DATE(concat_ws('-', yr, cast(qt*3-2 as int), '01')) as qt_date,
-- For Month: Use 1st day of month
MAKE_DATE(yr, mo, 1) as mo_date, MAKE_DATE(yr, mo, 1) as mo_date,
-- For Year: Use Jan 1st
MAKE_DATE(yr, 1, 1) as yr_date, MAKE_DATE(yr, 1, 1) as yr_date,
-- For Day: Use actual date
MAKE_DATE(yr, mo, da) as da_date MAKE_DATE(yr, mo, da) as da_date
FROM data_prep FROM data_prep
@@ -71,84 +77,97 @@ def task_11a_rollup(spark: SparkSession, station_name="Kempten"):
df_rollup.cache() df_rollup.cache()
df_rollup.createOrReplaceTempView("station_rollup") df_rollup.createOrReplaceTempView("station_rollup")
# Trigger Action for Cache & Time Measurement # Trigger Action
count = df_rollup.count() count = df_rollup.count()
print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start:.2f}s") print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start_time:.2f}s")
input(">> 11a: Check Spark UI (Stages/Storage) jetzt. Enter für Plots...")
# Plot 1: Tageswerte (letzte 3 Jahre der Daten) # --- PLOTTING ---
# Plot 1: Tageswerte (letzte 3 Jahre)
# Filter: All levels must be present (not null)
q_days = """ q_days = """
SELECT da_date as date, avg_temp SELECT da_date as date, avg_temp
FROM station_rollup FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL
AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup) AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date ORDER BY date
""" """
pdf_days = spark.sql(q_days).toPandas() pdf_days = spark.sql(q_days).toPandas()
plt.figure(1, figsize=(10, 5)) if pdf_days.empty:
plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5) print("Warnung: Keine Daten für Tages-Plot gefunden.")
plt.title(f"{station_name}: Daily Average (Last 3 Years)") else:
plt.xlabel('Date') plt.figure(1, figsize=(10, 5))
plt.ylabel('Temp °C') plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5)
plt.tight_layout() plt.title(f"{station_name}: Daily Average (Last 3 Years)")
plt.show() plt.xlabel('Date')
plt.ylabel('Temp °C')
plt.tight_layout()
plt.show()
# Plot 2: Monatswerte (10-20 Jahre) # Plot 2: Monatswerte (10-20 Jahre)
# Filter: Day is NULL (aggregation level), but Month is NOT NULL
q_months = """ q_months = """
SELECT mo_date as date, avg_temp SELECT mo_date as date, avg_temp
FROM station_rollup FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date ORDER BY date
""" """
pdf_months = spark.sql(q_months).toPandas() pdf_months = spark.sql(q_months).toPandas()
plt.figure(2, figsize=(10, 5)) if not pdf_months.empty:
plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg') plt.figure(2, figsize=(10, 5))
plt.title(f"{station_name}: Monthly Average (Last 20 Years)") plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg')
plt.xlabel('Date') plt.title(f"{station_name}: Monthly Average (Last 20 Years)")
plt.ylabel('Temp °C') plt.xlabel('Date')
plt.tight_layout() plt.ylabel('Temp °C')
plt.show() plt.tight_layout()
plt.show()
# Plot 3: Quartalswerte # Plot 3: Quartalswerte
# Filter: Month is NULL, Quarter is NOT NULL
q_quarters = """ q_quarters = """
SELECT qt_date as date, avg_temp SELECT qt_date as date, avg_temp
FROM station_rollup FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date ORDER BY date
""" """
pdf_quarters = spark.sql(q_quarters).toPandas() pdf_quarters = spark.sql(q_quarters).toPandas()
plt.figure(3, figsize=(10, 5)) if not pdf_quarters.empty:
plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg') plt.figure(3, figsize=(10, 5))
plt.title(f"{station_name}: Quarterly Average (Last 20 Years)") plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg')
plt.show() plt.title(f"{station_name}: Quarterly Average (Last 20 Years)")
plt.tight_layout()
plt.show()
# Plot 4: Jahreswerte # Plot 4: Jahreswerte
# Filter: Quarter is NULL, Year is NOT NULL
q_years = """ q_years = """
SELECT yr_date as date, min_temp, max_temp, avg_temp SELECT yr_date as date, min_temp, max_temp, avg_temp
FROM station_rollup FROM station_rollup
WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup) AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
ORDER BY date ORDER BY date
""" """
pdf_years = spark.sql(q_years).toPandas() pdf_years = spark.sql(q_years).toPandas()
plt.figure(4, figsize=(10, 5)) if not pdf_years.empty:
plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max') plt.figure(4, figsize=(10, 5))
plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg') plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max')
plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min') plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg')
plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)") plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min')
plt.legend() plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)")
plt.show() plt.legend()
plt.tight_layout()
plt.show()
def task_11b_rank(spark: SparkSession): def task_11b_rank(spark: SparkSession):
print("\n--- Aufgabe 11b: TempMonat Ranking ---") print("\n--- Aufgabe 11b: TempMonat Ranking ---")
start = time.time()
q_tempmonat = """ q_tempmonat = """
SELECT SELECT
@@ -194,13 +213,12 @@ def task_11b_rank(spark: SparkSession):
""" """
spark.sql(q_rank_global).show(10) spark.sql(q_rank_global).show(10)
print(f"Dauer 11b: {time.time() - start:.2f}s") print("11b: Fertig.")
input(">> 11b: Check Spark UI (Jobs/Stages). Enter...")
def task_11c_groupingsets(spark: SparkSession): def task_11c_groupingsets(spark: SparkSession):
print("\n--- Aufgabe 11c: Grouping Sets ---") print("\n--- Aufgabe 11c: Grouping Sets ---")
start = time.time()
q_prep = """ q_prep = """
SELECT SELECT
@@ -234,8 +252,8 @@ def task_11c_groupingsets(spark: SparkSession):
df_gs.createOrReplaceTempView("grouping_result") df_gs.createOrReplaceTempView("grouping_result")
# Action zum Cachen # Action zum Cachen
df_gs.count() df_gs.count()
print(f"Grouping Sets berechnet. Dauer: {time.time() - start:.2f}s") print("Grouping Sets berechnet.")
print("Auswahl 1: Jahr & Bundesland") print("Auswahl 1: Jahr & Bundesland")
spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5) spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5)
@@ -245,122 +263,14 @@ def task_11c_groupingsets(spark: SparkSession):
print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)") print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)")
spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5) spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5)
input(">> 11c: Check Spark UI (Zugriffspläne/Storage). Enter...")
# ---------------------------------------------------------
# AUFGABE 12
# ---------------------------------------------------------
def task_12_stocks_analysis(spark: SparkSession):
print("\n--- Aufgabe 12: Stocks & Portfolio ---")
# a) Erstes und letztes Datum je Symbol
print("a) Min/Max Datum pro Symbol")
t0 = time.time()
q_a = """
SELECT symbol, MIN(date) as first_date, MAX(date) as last_date
FROM stocks
GROUP BY symbol
ORDER BY symbol
"""
spark.sql(q_a).show(5)
print(f"Zeit a): {time.time()-t0:.2f}s")
# b) Aggregationen 2009
print("\nb) High/Low/Avg Close 2009")
t0 = time.time()
q_b = """
SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close
FROM stocks
WHERE YEAR(date) = 2009
GROUP BY symbol
ORDER BY symbol
"""
spark.sql(q_b).show(5)
print(f"Zeit b): {time.time()-t0:.2f}s")
# c) Lateral View (Explode Portfolio)
print("\nc) Lateral View: Aktien in Portfolios")
t0 = time.time()
q_c = """
SELECT
h.symbol,
SUM(h.amount) as total_shares,
COUNT(p.portfolioId) as num_portfolios,
AVG(h.amount) as avg_per_portfolio
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
GROUP BY h.symbol
ORDER BY h.symbol
"""
spark.sql(q_c).show(5)
print(f"Zeit c): {time.time()-t0:.2f}s")
# d) Symbole in keinem Portfolio (Anti Join)
print("\nd) Symbole ohne Portfolio")
t0 = time.time()
q_d = """
SELECT DISTINCT s.symbol
FROM stocks s
LEFT ANTI JOIN (
SELECT DISTINCT h.symbol
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
) p_sym ON s.symbol = p_sym.symbol
ORDER BY s.symbol
"""
spark.sql(q_d).show(5)
print(f"Zeit d): {time.time()-t0:.2f}s")
input(">> 12 a-d fertig. Check UI. Enter für e)...")
# e) Portfolio Wert Ende 2010
print("\ne) Portfolio Bewertung Ende 2010")
t0 = time.time()
q_last_price = """
SELECT symbol, close
FROM (
SELECT
symbol,
close,
ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn
FROM stocks
WHERE YEAR(date) = 2010
) tmp
WHERE rn = 1
"""
spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end")
# Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren
q_val = """
SELECT
p.portfolioId,
SUM(h.amount * s.close) as portfolio_value_2010
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
JOIN stocks_2010_end s ON h.symbol = s.symbol
GROUP BY p.portfolioId
ORDER BY p.portfolioId
"""
spark.sql(q_val).show(5)
print(f"Zeit e): {time.time()-t0:.2f}s")
def main(scon, spark): def main(scon, spark):
init_view_stations(spark) read_parquets(spark)
# Aufgabe 11 # Aufgabe 11
task_11a_rollup(spark, station_name="Kempten") task_11a_rollup(spark, station_name="Kempten")
task_11b_rank(spark) task_11b_rank(spark)
task_11c_groupingsets(spark) task_11c_groupingsets(spark)
# Aufgabe 12
init_view_stocks(spark)
task_12_stocks_analysis(spark)
if __name__ == '__main__': if __name__ == '__main__':
main(scon, spark) main(scon, spark)

123
Aufgabe 12/Aufgabe12.py Normal file
View File

@@ -0,0 +1,123 @@
from sparkstart import scon, spark
from pyspark.sql import SparkSession
import time
import matplotlib.pyplot as plt
import pandas as pd
HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/"
HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
def init_view_stocks(spark):
"""Lädt die Stocks-Daten für Aufgabe 12"""
# Hinweis: Pfade anpassen, falls sie im Cluster anders liegen
spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks")
spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio")
# ---------------------------------------------------------
# AUFGABE 12
# ---------------------------------------------------------
def task_12_stocks_analysis(spark: SparkSession):
print("\n--- Aufgabe 12: Stocks & Portfolio ---")
# a) Erstes und letztes Datum je Symbol
print("a) Min/Max Datum pro Symbol")
t0 = time.time()
q_a = """
SELECT symbol, MIN(date) as first_date, MAX(date) as last_date
FROM stocks
GROUP BY symbol
ORDER BY symbol
"""
spark.sql(q_a).show(5)
print(f"Zeit a): {time.time()-t0:.2f}s")
# b) Aggregationen 2009
print("\nb) High/Low/Avg Close 2009")
t0 = time.time()
q_b = """
SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close
FROM stocks
WHERE YEAR(date) = 2009
GROUP BY symbol
ORDER BY symbol
"""
spark.sql(q_b).show(5)
print(f"Zeit b): {time.time()-t0:.2f}s")
# c) Lateral View (Explode Portfolio)
print("\nc) Lateral View: Aktien in Portfolios")
t0 = time.time()
q_c = """
SELECT
h.symbol,
SUM(h.amount) as total_shares,
COUNT(p.portfolioId) as num_portfolios,
AVG(h.amount) as avg_per_portfolio
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
GROUP BY h.symbol
ORDER BY h.symbol
"""
spark.sql(q_c).show(5)
print(f"Zeit c): {time.time()-t0:.2f}s")
# d) Symbole in keinem Portfolio (Anti Join)
print("\nd) Symbole ohne Portfolio")
t0 = time.time()
q_d = """
SELECT DISTINCT s.symbol
FROM stocks s
LEFT ANTI JOIN (
SELECT DISTINCT h.symbol
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
) p_sym ON s.symbol = p_sym.symbol
ORDER BY s.symbol
"""
spark.sql(q_d).show(5)
print(f"Zeit d): {time.time()-t0:.2f}s")
input(">> 12 a-d fertig. Check UI. Enter für e)...")
# e) Portfolio Wert Ende 2010
print("\ne) Portfolio Bewertung Ende 2010")
t0 = time.time()
q_last_price = """
SELECT symbol, close
FROM (
SELECT
symbol,
close,
ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn
FROM stocks
WHERE YEAR(date) = 2010
) tmp
WHERE rn = 1
"""
spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end")
# Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren
q_val = """
SELECT
p.portfolioId,
SUM(h.amount * s.close) as portfolio_value_2010
FROM portfolio p
LATERAL VIEW explode(holdings) t AS h
JOIN stocks_2010_end s ON h.symbol = s.symbol
GROUP BY p.portfolioId
ORDER BY p.portfolioId
"""
spark.sql(q_val).show(5)
print(f"Zeit e): {time.time()-t0:.2f}s")
def main(scon, spark):
# Aufgabe 12
init_view_stocks(spark)
task_12_stocks_analysis(spark)
if __name__ == '__main__':
main(scon, spark)

22
Aufgabe 12/sparkstart.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()