mirror of
https://github.com/Vale54321/BigData.git
synced 2025-12-15 11:29:32 +01:00
Compare commits
4 Commits
de3782d570
...
1b2de95b2e
| Author | SHA1 | Date | |
|---|---|---|---|
| 1b2de95b2e | |||
| d18e9823e5 | |||
| 2b26188647 | |||
| f89d39d420 |
@@ -5,7 +5,6 @@ import matplotlib.pyplot as plt
|
||||
|
||||
HDFSPATH = "hdfs://193.174.205.250:54310/"
|
||||
|
||||
|
||||
def read_parquets(spark: SparkSession):
|
||||
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
|
||||
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
|
||||
@@ -59,101 +58,96 @@ def duration_circle_size(spark: SparkSession):
|
||||
|
||||
|
||||
def compute_daily_and_yearly_frosts(spark: SparkSession):
|
||||
q_daily_max = (
|
||||
"SELECT stationId, date, SUBSTR(CAST(date AS STRING),1,4) AS year, MAX(TT_TU) AS max_temp "
|
||||
"FROM german_stations_data "
|
||||
"WHERE TT_TU IS NOT NULL "
|
||||
"GROUP BY stationId, date"
|
||||
)
|
||||
daily_max = spark.sql(q_daily_max)
|
||||
daily_max.createOrReplaceTempView('daily_max')
|
||||
q_daily_max = (
|
||||
"SELECT stationId, date, SUBSTR(CAST(date AS STRING),1,4) AS year, MAX(TT_TU) AS max_temp "
|
||||
"FROM german_stations_data "
|
||||
"WHERE TT_TU IS NOT NULL AND TT_TU > -50 AND TT_TU < 60 "
|
||||
"GROUP BY stationId, date"
|
||||
)
|
||||
daily_max = spark.sql(q_daily_max)
|
||||
daily_max.createOrReplaceTempView('daily_max')
|
||||
|
||||
# mark a day as frost if max_temp < 0
|
||||
q_daily_frost = (
|
||||
"SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost "
|
||||
"FROM daily_max"
|
||||
)
|
||||
daily_frost = spark.sql(q_daily_frost)
|
||||
daily_frost.createOrReplaceTempView('daily_frost')
|
||||
# mark a day as frost if max_temp < 0
|
||||
q_daily_frost = (
|
||||
"SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost "
|
||||
"FROM daily_max"
|
||||
)
|
||||
daily_frost = spark.sql(q_daily_frost)
|
||||
daily_frost.createOrReplaceTempView('daily_frost')
|
||||
|
||||
# yearly frostdays per station
|
||||
q_station_year = (
|
||||
"SELECT stationId, year, SUM(is_frost) AS frost_days "
|
||||
"FROM daily_frost GROUP BY stationId, year"
|
||||
)
|
||||
station_year_frost = spark.sql(q_station_year)
|
||||
station_year_frost.createOrReplaceTempView('station_year_frost')
|
||||
# yearly frostdays per station
|
||||
q_station_year = (
|
||||
"SELECT stationId, year, SUM(is_frost) AS frost_days "
|
||||
"FROM daily_frost GROUP BY stationId, year"
|
||||
)
|
||||
station_year_frost = spark.sql(q_station_year)
|
||||
station_year_frost.createOrReplaceTempView('station_year_frost')
|
||||
|
||||
|
||||
def frost_analysis(spark: SparkSession, year=2024, station_name_matches=('kempten',)):
|
||||
compute_daily_and_yearly_frosts(spark)
|
||||
compute_daily_and_yearly_frosts(spark)
|
||||
|
||||
# Debug: check available years and data
|
||||
spark.sql("SELECT year, COUNT(*) as cnt FROM station_year_frost GROUP BY year ORDER BY year").show(50)
|
||||
q_hist = (
|
||||
f"SELECT frost_days, COUNT(*) AS station_count "
|
||||
f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days"
|
||||
)
|
||||
hist_df = spark.sql(q_hist)
|
||||
|
||||
q_hist = (
|
||||
f"SELECT frost_days, COUNT(*) AS station_count "
|
||||
f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days"
|
||||
)
|
||||
hist_df = spark.sql(q_hist)
|
||||
hist_pdf = hist_df.toPandas()
|
||||
|
||||
hist_pdf = hist_df.toPandas()
|
||||
if hist_pdf.empty:
|
||||
print(f"No frost data found for year {year}. Trying to find available years...")
|
||||
q_all = "SELECT frost_days, COUNT(*) AS station_count FROM station_year_frost GROUP BY frost_days ORDER BY frost_days"
|
||||
hist_pdf = spark.sql(q_all).toPandas()
|
||||
if hist_pdf.empty:
|
||||
print("No frost data available at all. Check if TT_TU column contains valid temperature data.")
|
||||
return
|
||||
print(f"Found {len(hist_pdf)} frost day categories across all years")
|
||||
|
||||
if hist_pdf.empty:
|
||||
print(f"No frost data found for year {year}. Trying to find available years...")
|
||||
# Try without year filter to see if data exists
|
||||
q_all = "SELECT frost_days, COUNT(*) AS station_count FROM station_year_frost GROUP BY frost_days ORDER BY frost_days"
|
||||
hist_pdf = spark.sql(q_all).toPandas()
|
||||
if hist_pdf.empty:
|
||||
print("No frost data available at all. Check if TT_TU column contains valid temperature data.")
|
||||
return
|
||||
print(f"Found {len(hist_pdf)} frost day categories across all years")
|
||||
plt.figure(figsize=(8, 5))
|
||||
plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue')
|
||||
plt.xlabel('Number of Frost Days in year ' + str(year))
|
||||
plt.ylabel('Number of Stations')
|
||||
plt.title(f'Stations vs Frost Days ({year})')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
plt.figure(figsize=(8, 5))
|
||||
plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue')
|
||||
plt.xlabel('Number of Frost Days in year ' + str(year))
|
||||
plt.ylabel('Number of Stations')
|
||||
plt.title(f'Stations vs Frost Days ({year})')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
for name in station_name_matches:
|
||||
q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'"
|
||||
ids_df = spark.sql(q_find)
|
||||
ids = ids_df.collect()
|
||||
if not ids:
|
||||
print(f"No stations found matching '{name}'")
|
||||
continue
|
||||
for r in ids:
|
||||
sid = r['stationId']
|
||||
sname = r['station_name']
|
||||
print(f"Analyzing stationId={sid} name={sname}")
|
||||
|
||||
for name in station_name_matches:
|
||||
q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'"
|
||||
ids_df = spark.sql(q_find)
|
||||
ids = ids_df.collect()
|
||||
if not ids:
|
||||
print(f"No stations found matching '{name}'")
|
||||
continue
|
||||
for r in ids:
|
||||
sid = r['stationId']
|
||||
sname = r['station_name']
|
||||
print(f"Analyzing stationId={sid} name={sname}")
|
||||
q_ts = (
|
||||
"SELECT year, frost_days, "
|
||||
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, "
|
||||
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 "
|
||||
f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)"
|
||||
)
|
||||
ts_df = spark.sql(q_ts)
|
||||
|
||||
# compute frostdays + 5-yr and 20-yr rolling averages using window frame
|
||||
q_ts = (
|
||||
"SELECT year, frost_days, "
|
||||
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, "
|
||||
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 "
|
||||
f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)"
|
||||
)
|
||||
ts_df = spark.sql(q_ts)
|
||||
pdf = ts_df.toPandas()
|
||||
if pdf.empty:
|
||||
print(f"No yearly frost data for station {sid}")
|
||||
continue
|
||||
|
||||
pdf = ts_df.toPandas()
|
||||
if pdf.empty:
|
||||
print(f"No yearly frost data for station {sid}")
|
||||
continue
|
||||
|
||||
pdf['year'] = pdf['year'].astype(int)
|
||||
plt.figure(figsize=(10, 5))
|
||||
plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o')
|
||||
plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--')
|
||||
plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':')
|
||||
plt.xlabel('Year')
|
||||
plt.ylabel('Frost Days')
|
||||
plt.title(f'Frost Days over Years for {sname} (station {sid})')
|
||||
plt.legend()
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
pdf['year'] = pdf['year'].astype(int)
|
||||
plt.figure(figsize=(10, 5))
|
||||
plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o')
|
||||
plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--')
|
||||
plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':')
|
||||
plt.xlabel('Year')
|
||||
plt.ylabel('Frost Days')
|
||||
plt.title(f'Frost Days over Years for {sname} (station {sid})')
|
||||
plt.legend()
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
|
||||
def height_frost_correlation(spark: SparkSession):
|
||||
|
||||
276
Aufgabe 11/Aufgabe11.py
Normal file
276
Aufgabe 11/Aufgabe11.py
Normal file
@@ -0,0 +1,276 @@
|
||||
from sparkstart import scon, spark
|
||||
from pyspark.sql import SparkSession
|
||||
import matplotlib.pyplot as plt
|
||||
import pandas as pd
|
||||
|
||||
HDFSPATH = "hdfs://193.174.205.250:54310/"
|
||||
HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
|
||||
|
||||
def read_parquets(spark: SparkSession):
|
||||
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
|
||||
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
|
||||
|
||||
stations_df = spark.read.parquet(stations_path)
|
||||
stations_df.createOrReplaceTempView("german_stations")
|
||||
|
||||
products_df = spark.read.parquet(products_path)
|
||||
products_df.createOrReplaceTempView("german_stations_data")
|
||||
|
||||
stations_df.cache()
|
||||
products_df.cache()
|
||||
|
||||
def task_11a_rollup(spark: SparkSession, station_name="Kempten"):
|
||||
print(f"\n--- Aufgabe 11a: Rollup & Plotting für {station_name} ---")
|
||||
start_time = time.time()
|
||||
|
||||
# 1. Station ID finden
|
||||
# Case-insensitive search
|
||||
sid_df = spark.sql(f"SELECT stationId FROM german_stations WHERE lower(station_name) LIKE '%{station_name.lower()}%'")
|
||||
try:
|
||||
sid = sid_df.collect()[0]['stationId']
|
||||
print(f"Station found: {station_name} -> ID {sid}")
|
||||
except IndexError:
|
||||
print(f"Station {station_name} nicht gefunden.")
|
||||
return
|
||||
|
||||
# 2. Rollup Query vorbereiten
|
||||
# FIX: Parse string date 'YYYYMMDD' to real DATE object first
|
||||
q_prep = f"""
|
||||
SELECT
|
||||
YEAR(TO_DATE(date, 'yyyyMMdd')) as yr,
|
||||
QUARTER(TO_DATE(date, 'yyyyMMdd')) as qt,
|
||||
MONTH(TO_DATE(date, 'yyyyMMdd')) as mo,
|
||||
DAY(TO_DATE(date, 'yyyyMMdd')) as da,
|
||||
TT_TU
|
||||
FROM german_stations_data
|
||||
WHERE stationId = {sid}
|
||||
AND TT_TU IS NOT NULL
|
||||
AND TT_TU > -50
|
||||
AND TT_TU < 60
|
||||
"""
|
||||
spark.sql(q_prep).createOrReplaceTempView("data_prep")
|
||||
|
||||
# 3. Rollup Execution
|
||||
# Note: We use string construction for quarters/months to ensure we get a valid date string for plotting
|
||||
q_rollup = """
|
||||
SELECT
|
||||
yr, qt, mo, da,
|
||||
MIN(TT_TU) as min_temp,
|
||||
MAX(TT_TU) as max_temp,
|
||||
AVG(TT_TU) as avg_temp,
|
||||
|
||||
-- Construct dates for plotting (handling the NULLs from ROLLUP)
|
||||
-- For Quarter: Use 1st month of quarter
|
||||
DATE(concat_ws('-', yr, cast(qt*3-2 as int), '01')) as qt_date,
|
||||
-- For Month: Use 1st day of month
|
||||
MAKE_DATE(yr, mo, 1) as mo_date,
|
||||
-- For Year: Use Jan 1st
|
||||
MAKE_DATE(yr, 1, 1) as yr_date,
|
||||
-- For Day: Use actual date
|
||||
MAKE_DATE(yr, mo, da) as da_date
|
||||
|
||||
FROM data_prep
|
||||
GROUP BY ROLLUP(yr, qt, mo, da)
|
||||
"""
|
||||
|
||||
df_rollup = spark.sql(q_rollup)
|
||||
df_rollup.cache()
|
||||
df_rollup.createOrReplaceTempView("station_rollup")
|
||||
|
||||
# Trigger Action
|
||||
count = df_rollup.count()
|
||||
print(f"Rollup berechnet. Zeilen: {count}. Dauer: {time.time() - start_time:.2f}s")
|
||||
|
||||
# --- PLOTTING ---
|
||||
|
||||
# Plot 1: Tageswerte (letzte 3 Jahre)
|
||||
# Filter: All levels must be present (not null)
|
||||
q_days = """
|
||||
SELECT da_date as date, avg_temp
|
||||
FROM station_rollup
|
||||
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NOT NULL
|
||||
AND yr >= (SELECT MAX(yr) - 2 FROM station_rollup WHERE yr IS NOT NULL)
|
||||
ORDER BY date
|
||||
"""
|
||||
pdf_days = spark.sql(q_days).toPandas()
|
||||
|
||||
if pdf_days.empty:
|
||||
print("Warnung: Keine Daten für Tages-Plot gefunden.")
|
||||
else:
|
||||
plt.figure(1, figsize=(10, 5))
|
||||
plt.plot(pdf_days['date'], pdf_days['avg_temp'], label='Daily Avg', linewidth=0.5)
|
||||
plt.title(f"{station_name}: Daily Average (Last 3 Years)")
|
||||
plt.xlabel('Date')
|
||||
plt.ylabel('Temp °C')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
# Plot 2: Monatswerte (10-20 Jahre)
|
||||
# Filter: Day is NULL (aggregation level), but Month is NOT NULL
|
||||
q_months = """
|
||||
SELECT mo_date as date, avg_temp
|
||||
FROM station_rollup
|
||||
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NOT NULL AND da IS NULL
|
||||
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
|
||||
ORDER BY date
|
||||
"""
|
||||
pdf_months = spark.sql(q_months).toPandas()
|
||||
|
||||
if not pdf_months.empty:
|
||||
plt.figure(2, figsize=(10, 5))
|
||||
plt.plot(pdf_months['date'], pdf_months['avg_temp'], color='green', label='Monthly Avg')
|
||||
plt.title(f"{station_name}: Monthly Average (Last 20 Years)")
|
||||
plt.xlabel('Date')
|
||||
plt.ylabel('Temp °C')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
# Plot 3: Quartalswerte
|
||||
# Filter: Month is NULL, Quarter is NOT NULL
|
||||
q_quarters = """
|
||||
SELECT qt_date as date, avg_temp
|
||||
FROM station_rollup
|
||||
WHERE yr IS NOT NULL AND qt IS NOT NULL AND mo IS NULL AND da IS NULL
|
||||
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
|
||||
ORDER BY date
|
||||
"""
|
||||
pdf_quarters = spark.sql(q_quarters).toPandas()
|
||||
|
||||
if not pdf_quarters.empty:
|
||||
plt.figure(3, figsize=(10, 5))
|
||||
plt.plot(pdf_quarters['date'], pdf_quarters['avg_temp'], color='orange', marker='o', linestyle='-', label='Quarterly Avg')
|
||||
plt.title(f"{station_name}: Quarterly Average (Last 20 Years)")
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
# Plot 4: Jahreswerte
|
||||
# Filter: Quarter is NULL, Year is NOT NULL
|
||||
q_years = """
|
||||
SELECT yr_date as date, min_temp, max_temp, avg_temp
|
||||
FROM station_rollup
|
||||
WHERE yr IS NOT NULL AND qt IS NULL AND mo IS NULL AND da IS NULL
|
||||
AND yr >= (SELECT MAX(yr) - 20 FROM station_rollup WHERE yr IS NOT NULL)
|
||||
ORDER BY date
|
||||
"""
|
||||
pdf_years = spark.sql(q_years).toPandas()
|
||||
|
||||
if not pdf_years.empty:
|
||||
plt.figure(4, figsize=(10, 5))
|
||||
plt.plot(pdf_years['date'], pdf_years['max_temp'], color='red', label='Max')
|
||||
plt.plot(pdf_years['date'], pdf_years['avg_temp'], color='black', label='Avg')
|
||||
plt.plot(pdf_years['date'], pdf_years['min_temp'], color='blue', label='Min')
|
||||
plt.title(f"{station_name}: Yearly Aggregates (Last 20 Years)")
|
||||
plt.legend()
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
|
||||
def task_11b_rank(spark: SparkSession):
|
||||
print("\n--- Aufgabe 11b: TempMonat Ranking ---")
|
||||
|
||||
|
||||
q_tempmonat = """
|
||||
SELECT
|
||||
d.stationId,
|
||||
s.station_name,
|
||||
SUBSTR(CAST(d.date AS STRING), 1, 4) as year,
|
||||
SUBSTR(CAST(d.date AS STRING), 6, 2) as month,
|
||||
MIN(d.TT_TU) as min_t,
|
||||
MAX(d.TT_TU) as max_t,
|
||||
AVG(d.TT_TU) as avg_t
|
||||
FROM german_stations_data d
|
||||
JOIN german_stations s ON d.stationId = s.stationId
|
||||
WHERE d.TT_TU IS NOT NULL AND d.TT_TU > -50
|
||||
GROUP BY d.stationId, s.station_name, year, month
|
||||
"""
|
||||
df_tm = spark.sql(q_tempmonat)
|
||||
df_tm.createOrReplaceTempView("tempmonat")
|
||||
|
||||
# 1. Ranking Partitioniert nach Monat im Jahr 2015
|
||||
print(" > Berechne Ranking für 2015 (partitioniert nach Monat)...")
|
||||
q_rank_2015 = """
|
||||
SELECT
|
||||
month, station_name, min_t,
|
||||
RANK() OVER (PARTITION BY month ORDER BY min_t ASC) as rank_min,
|
||||
RANK() OVER (PARTITION BY month ORDER BY max_t ASC) as rank_max,
|
||||
RANK() OVER (PARTITION BY month ORDER BY avg_t ASC) as rank_avg
|
||||
FROM tempmonat
|
||||
WHERE year = '2015'
|
||||
ORDER BY rank_min, month
|
||||
"""
|
||||
spark.sql(q_rank_2015).show(10)
|
||||
|
||||
# 2. Globales Ranking (über alle Monate/Jahre hinweg)
|
||||
print(" > Berechne Ranking global (kälteste Monate aller Zeiten)...")
|
||||
q_rank_global = """
|
||||
SELECT
|
||||
year, month, station_name, min_t,
|
||||
RANK() OVER (ORDER BY min_t ASC) as rank_min,
|
||||
RANK() OVER (ORDER BY max_t ASC) as rank_max,
|
||||
RANK() OVER (ORDER BY avg_t ASC) as rank_avg
|
||||
FROM tempmonat
|
||||
ORDER BY rank_min
|
||||
"""
|
||||
spark.sql(q_rank_global).show(10)
|
||||
|
||||
print("11b: Fertig.")
|
||||
|
||||
|
||||
def task_11c_groupingsets(spark: SparkSession):
|
||||
print("\n--- Aufgabe 11c: Grouping Sets ---")
|
||||
|
||||
|
||||
q_prep = """
|
||||
SELECT
|
||||
CAST(SUBSTR(CAST(d.date AS STRING), 1, 4) AS INT) as year,
|
||||
CAST(SUBSTR(CAST(d.date AS STRING), 6, 2) AS INT) as month,
|
||||
s.station_name,
|
||||
s.bundesland,
|
||||
d.TT_TU
|
||||
FROM german_stations_data d
|
||||
JOIN german_stations s ON d.stationId = s.stationId
|
||||
WHERE d.TT_TU > -50
|
||||
"""
|
||||
spark.sql(q_prep).createOrReplaceTempView("gs_base")
|
||||
|
||||
q_sets = """
|
||||
SELECT
|
||||
year,
|
||||
month,
|
||||
bundesland,
|
||||
station_name,
|
||||
MIN(TT_TU) as min_t, MAX(TT_TU) as max_t, AVG(TT_TU) as avg_t
|
||||
FROM gs_base
|
||||
GROUP BY GROUPING SETS (
|
||||
(year, bundesland),
|
||||
(year, station_name),
|
||||
(month, bundesland)
|
||||
)
|
||||
"""
|
||||
df_gs = spark.sql(q_sets)
|
||||
df_gs.cache()
|
||||
df_gs.createOrReplaceTempView("grouping_result")
|
||||
|
||||
# Action zum Cachen
|
||||
df_gs.count()
|
||||
print("Grouping Sets berechnet.")
|
||||
|
||||
print("Auswahl 1: Jahr & Bundesland")
|
||||
spark.sql("SELECT year, bundesland, avg_t FROM grouping_result WHERE station_name IS NULL AND month IS NULL ORDER BY year DESC, bundesland").show(5)
|
||||
|
||||
print("Auswahl 2: Jahr & Station")
|
||||
spark.sql("SELECT year, station_name, avg_t FROM grouping_result WHERE bundesland IS NULL AND month IS NULL ORDER BY year DESC, station_name").show(5)
|
||||
|
||||
print("Auswahl 3: Monat & Bundesland (Jahreszeitlicher Verlauf je Land)")
|
||||
spark.sql("SELECT month, bundesland, avg_t FROM grouping_result WHERE year IS NULL AND station_name IS NULL ORDER BY bundesland, month").show(5)
|
||||
|
||||
def main(scon, spark):
|
||||
read_parquets(spark)
|
||||
|
||||
# Aufgabe 11
|
||||
task_11a_rollup(spark, station_name="Kempten")
|
||||
task_11b_rank(spark)
|
||||
task_11c_groupingsets(spark)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(scon, spark)
|
||||
22
Aufgabe 11/sparkstart.py
Normal file
22
Aufgabe 11/sparkstart.py
Normal file
@@ -0,0 +1,22 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Erzeugen einer Spark-Konfiguration
|
||||
"""
|
||||
|
||||
from pyspark import SparkConf, SparkContext
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
# connect to cluster
|
||||
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
conf.set("spark.executor.memory", '32g')
|
||||
conf.set("spark.driver.memory", '8g')
|
||||
conf.set("spark.cores.max", "40")
|
||||
scon = SparkContext(conf=conf)
|
||||
|
||||
|
||||
spark = SparkSession \
|
||||
.builder \
|
||||
.appName("Python Spark SQL") \
|
||||
.getOrCreate()
|
||||
123
Aufgabe 12/Aufgabe12.py
Normal file
123
Aufgabe 12/Aufgabe12.py
Normal file
@@ -0,0 +1,123 @@
|
||||
from sparkstart import scon, spark
|
||||
from pyspark.sql import SparkSession
|
||||
import time
|
||||
import matplotlib.pyplot as plt
|
||||
import pandas as pd
|
||||
|
||||
HDFSPATH_STATIONS = "hdfs://193.174.205.250:54310/home/heiserervalentin/"
|
||||
HDFSPATH_STOCKS = "hdfs://193.174.205.250:54310/stocks/"
|
||||
|
||||
def init_view_stocks(spark):
|
||||
"""Lädt die Stocks-Daten für Aufgabe 12"""
|
||||
# Hinweis: Pfade anpassen, falls sie im Cluster anders liegen
|
||||
spark.read.parquet(HDFSPATH_STOCKS + "stocks.parquet").createOrReplaceTempView("stocks")
|
||||
spark.read.parquet(HDFSPATH_STOCKS + "portfolio.parquet").createOrReplaceTempView("portfolio")
|
||||
|
||||
# ---------------------------------------------------------
|
||||
# AUFGABE 12
|
||||
# ---------------------------------------------------------
|
||||
|
||||
def task_12_stocks_analysis(spark: SparkSession):
|
||||
print("\n--- Aufgabe 12: Stocks & Portfolio ---")
|
||||
|
||||
# a) Erstes und letztes Datum je Symbol
|
||||
print("a) Min/Max Datum pro Symbol")
|
||||
t0 = time.time()
|
||||
q_a = """
|
||||
SELECT symbol, MIN(date) as first_date, MAX(date) as last_date
|
||||
FROM stocks
|
||||
GROUP BY symbol
|
||||
ORDER BY symbol
|
||||
"""
|
||||
spark.sql(q_a).show(5)
|
||||
print(f"Zeit a): {time.time()-t0:.2f}s")
|
||||
|
||||
# b) Aggregationen 2009
|
||||
print("\nb) High/Low/Avg Close 2009")
|
||||
t0 = time.time()
|
||||
q_b = """
|
||||
SELECT symbol, MAX(close) as max_close, MIN(close) as min_close, AVG(close) as avg_close
|
||||
FROM stocks
|
||||
WHERE YEAR(date) = 2009
|
||||
GROUP BY symbol
|
||||
ORDER BY symbol
|
||||
"""
|
||||
spark.sql(q_b).show(5)
|
||||
print(f"Zeit b): {time.time()-t0:.2f}s")
|
||||
|
||||
# c) Lateral View (Explode Portfolio)
|
||||
print("\nc) Lateral View: Aktien in Portfolios")
|
||||
t0 = time.time()
|
||||
|
||||
q_c = """
|
||||
SELECT
|
||||
h.symbol,
|
||||
SUM(h.amount) as total_shares,
|
||||
COUNT(p.portfolioId) as num_portfolios,
|
||||
AVG(h.amount) as avg_per_portfolio
|
||||
FROM portfolio p
|
||||
LATERAL VIEW explode(holdings) t AS h
|
||||
GROUP BY h.symbol
|
||||
ORDER BY h.symbol
|
||||
"""
|
||||
spark.sql(q_c).show(5)
|
||||
print(f"Zeit c): {time.time()-t0:.2f}s")
|
||||
|
||||
# d) Symbole in keinem Portfolio (Anti Join)
|
||||
print("\nd) Symbole ohne Portfolio")
|
||||
t0 = time.time()
|
||||
q_d = """
|
||||
SELECT DISTINCT s.symbol
|
||||
FROM stocks s
|
||||
LEFT ANTI JOIN (
|
||||
SELECT DISTINCT h.symbol
|
||||
FROM portfolio p
|
||||
LATERAL VIEW explode(holdings) t AS h
|
||||
) p_sym ON s.symbol = p_sym.symbol
|
||||
ORDER BY s.symbol
|
||||
"""
|
||||
spark.sql(q_d).show(5)
|
||||
print(f"Zeit d): {time.time()-t0:.2f}s")
|
||||
|
||||
input(">> 12 a-d fertig. Check UI. Enter für e)...")
|
||||
|
||||
# e) Portfolio Wert Ende 2010
|
||||
print("\ne) Portfolio Bewertung Ende 2010")
|
||||
t0 = time.time()
|
||||
|
||||
q_last_price = """
|
||||
SELECT symbol, close
|
||||
FROM (
|
||||
SELECT
|
||||
symbol,
|
||||
close,
|
||||
ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY date DESC) as rn
|
||||
FROM stocks
|
||||
WHERE YEAR(date) = 2010
|
||||
) tmp
|
||||
WHERE rn = 1
|
||||
"""
|
||||
spark.sql(q_last_price).createOrReplaceTempView("stocks_2010_end")
|
||||
|
||||
# Schritt 2: Portfolio explodieren, mit Preis joinen, berechnen, summieren
|
||||
q_val = """
|
||||
SELECT
|
||||
p.portfolioId,
|
||||
SUM(h.amount * s.close) as portfolio_value_2010
|
||||
FROM portfolio p
|
||||
LATERAL VIEW explode(holdings) t AS h
|
||||
JOIN stocks_2010_end s ON h.symbol = s.symbol
|
||||
GROUP BY p.portfolioId
|
||||
ORDER BY p.portfolioId
|
||||
"""
|
||||
spark.sql(q_val).show(5)
|
||||
print(f"Zeit e): {time.time()-t0:.2f}s")
|
||||
|
||||
|
||||
def main(scon, spark):
|
||||
# Aufgabe 12
|
||||
init_view_stocks(spark)
|
||||
task_12_stocks_analysis(spark)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(scon, spark)
|
||||
22
Aufgabe 12/sparkstart.py
Normal file
22
Aufgabe 12/sparkstart.py
Normal file
@@ -0,0 +1,22 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Erzeugen einer Spark-Konfiguration
|
||||
"""
|
||||
|
||||
from pyspark import SparkConf, SparkContext
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
# connect to cluster
|
||||
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
conf.set("spark.executor.memory", '32g')
|
||||
conf.set("spark.driver.memory", '8g')
|
||||
conf.set("spark.cores.max", "40")
|
||||
scon = SparkContext(conf=conf)
|
||||
|
||||
|
||||
spark = SparkSession \
|
||||
.builder \
|
||||
.appName("Python Spark SQL") \
|
||||
.getOrCreate()
|
||||
Reference in New Issue
Block a user