mirror of
https://github.com/Vale54321/BigData.git
synced 2026-02-04 00:35:55 +01:00
219 lines
7.2 KiB
Python
219 lines
7.2 KiB
Python
from __future__ import annotations
|
|
from sparkstart import scon, spark
|
|
from pyspark.sql import SparkSession
|
|
import matplotlib.pyplot as plt
|
|
|
|
HDFSPATH = "hdfs://193.174.205.250:54310/"
|
|
|
|
def read_parquet_tables(spark: SparkSession) -> None:
|
|
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
|
|
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
|
|
|
|
spark.read.parquet(stations_path).createOrReplaceTempView("german_stations")
|
|
spark.read.parquet(products_path).createOrReplaceTempView("german_stations_data")
|
|
|
|
# --- Aufgabe A ---
|
|
|
|
def create_mma_rollup(spark: SparkSession, station_id: int):
|
|
query = f"""
|
|
WITH processed_data AS (
|
|
SELECT
|
|
TT_TU,
|
|
hour AS messtunde,
|
|
TO_DATE(SUBSTR(date, 1, 4), 'yyyy') AS jahr,
|
|
TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', LPAD(CAST(QUARTER(TO_DATE(date, 'yyyyMMdd'))*3-2 AS STRING), 2, '0'), '-01')) AS quartal,
|
|
TO_DATE(CONCAT(SUBSTR(date, 1, 4), '-', SUBSTR(date, 5, 2), '-01')) AS monat,
|
|
TO_DATE(date, 'yyyyMMdd') AS tag
|
|
FROM german_stations_data
|
|
WHERE stationId = {station_id}
|
|
AND TT_TU IS NOT NULL
|
|
AND TT_TU <> -999
|
|
)
|
|
SELECT
|
|
MIN(TT_TU) AS minTemperatur,
|
|
MAX(TT_TU) AS maxTemperatur,
|
|
AVG(TT_TU) AS avgTemperatur,
|
|
jahr,
|
|
quartal,
|
|
monat,
|
|
tag,
|
|
messtunde
|
|
FROM processed_data
|
|
GROUP BY ROLLUP (jahr, quartal, monat, tag, messtunde)
|
|
ORDER BY jahr, quartal, monat, tag, messtunde
|
|
"""
|
|
df = spark.sql(query)
|
|
df.createOrReplaceTempView("mmacdcdata")
|
|
df.cache()
|
|
df.show(10)
|
|
|
|
def plot_date_values(spark: SparkSession, level: str):
|
|
filters = {
|
|
"days": "YEAR(jahr) > 2017 AND YEAR(jahr) < 2021 AND messtunde IS NULL AND tag IS NOT NULL",
|
|
"months": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NOT NULL",
|
|
"quartals": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NOT NULL",
|
|
"years": "YEAR(jahr) > 1999 AND YEAR(jahr) < 2021 AND tag IS NULL AND monat IS NULL AND quartal IS NULL AND jahr IS NOT NULL"
|
|
}
|
|
x_col = {"days": "tag", "months": "monat", "quartals": "quartal", "years": "jahr"}
|
|
|
|
pdf = spark.sql(f"SELECT * FROM mmacdcdata WHERE {filters[level]}").toPandas()
|
|
if pdf.empty: return
|
|
|
|
plt.figure(figsize=(10, 5))
|
|
plt.plot(pdf[x_col[level]], pdf["maxTemperatur"], "red", label="Max")
|
|
plt.plot(pdf[x_col[level]], pdf["avgTemperatur"], "green", label="Avg")
|
|
plt.plot(pdf[x_col[level]], pdf["minTemperatur"], "blue", label="Min")
|
|
plt.title(f"{level.capitalize()}")
|
|
plt.legend()
|
|
plt.grid(True)
|
|
plt.show()
|
|
|
|
# --- Aufgabe B ---
|
|
|
|
def create_tempmonat(spark: SparkSession):
|
|
query = """
|
|
WITH base_data AS (
|
|
SELECT
|
|
d.stationId,
|
|
gs.station_name AS stationsname,
|
|
d.TT_TU,
|
|
TO_DATE(SUBSTR(d.date, 1, 4), 'yyyy') AS jahr_val,
|
|
TO_DATE(CONCAT(SUBSTR(d.date, 1, 4), '-', SUBSTR(d.date, 5, 2), '-01')) AS monat_val
|
|
FROM german_stations_data d
|
|
JOIN german_stations gs ON d.stationId = gs.stationId
|
|
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
|
|
)
|
|
SELECT
|
|
stationId,
|
|
stationsname,
|
|
MIN(TT_TU) AS minTemperatur,
|
|
MAX(TT_TU) AS maxTemperatur,
|
|
AVG(TT_TU) AS avgTemperatur,
|
|
jahr_val AS jahr,
|
|
monat_val AS monat
|
|
FROM base_data
|
|
GROUP BY stationId, stationsname, jahr_val, monat_val
|
|
"""
|
|
spark.sql(query).cache().createOrReplaceTempView("tempmonat")
|
|
|
|
def rank_temperatures(spark: SparkSession, limit: int, year: int = None):
|
|
where_clause = f"WHERE YEAR(jahr) = {year}" if year else ""
|
|
query = f"""
|
|
SELECT stationid, stationsname, monat, minTemperatur,
|
|
RANK() OVER (ORDER BY minTemperatur ASC) AS rangMIN,
|
|
maxTemperatur,
|
|
RANK() OVER (ORDER BY maxTemperatur DESC) AS rangMAX,
|
|
avgTemperatur,
|
|
RANK() OVER (ORDER BY avgTemperatur DESC) AS rangAVG
|
|
FROM tempmonat
|
|
{where_clause}
|
|
ORDER BY rangMIN
|
|
"""
|
|
spark.sql(query).show(limit, truncate=False)
|
|
|
|
# --- Aufgabe C ---
|
|
|
|
def create_grouping_sets_view(spark: SparkSession):
|
|
query = """
|
|
WITH base_gs AS (
|
|
SELECT
|
|
d.stationId,
|
|
-- TRIM entfernt Leerzeichen für saubere Tabellen
|
|
TRIM(gs.bundesland) AS bundesland_clean,
|
|
d.TT_TU,
|
|
-- Extrahiere Jahr und Kalendermonat (1-12)
|
|
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS jahr_val,
|
|
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS monat_val
|
|
FROM german_stations_data d
|
|
JOIN german_stations gs ON d.stationId = gs.stationId
|
|
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
|
|
)
|
|
SELECT
|
|
stationId,
|
|
bundesland_clean AS bundesland,
|
|
jahr_val AS jahr,
|
|
monat_val AS monat,
|
|
MIN(TT_TU) AS minTemperatur,
|
|
MAX(TT_TU) AS maxTemperatur,
|
|
AVG(TT_TU) AS avgTemperatur
|
|
FROM base_gs
|
|
GROUP BY GROUPING SETS (
|
|
(bundesland_clean, jahr_val), -- 1. Jahr und Bundesland
|
|
(stationId, jahr_val), -- 2. Jahr und Station
|
|
(bundesland_clean, monat_val) -- 3. Monat und Bundesland
|
|
)
|
|
"""
|
|
df = spark.sql(query)
|
|
|
|
df.cache()
|
|
|
|
df.createOrReplaceTempView("tempmma_gs")
|
|
|
|
|
|
def show_seperate_gs(spark: SparkSession, limit: int):
|
|
|
|
# Filter: stationId muss NULL sein, monat muss NULL sein
|
|
spark.sql("""
|
|
SELECT
|
|
jahr,
|
|
bundesland,
|
|
minTemperatur,
|
|
maxTemperatur,
|
|
ROUND(avgTemperatur, 2) as avgTemperatur
|
|
FROM tempmma_gs
|
|
WHERE bundesland IS NOT NULL
|
|
AND jahr IS NOT NULL
|
|
AND stationId IS NULL
|
|
AND monat IS NULL
|
|
ORDER BY jahr DESC, bundesland ASC
|
|
""").show(limit, truncate=False)
|
|
|
|
# Filter: bundesland muss NULL sein, monat muss NULL sein
|
|
spark.sql("""
|
|
SELECT
|
|
jahr,
|
|
stationId,
|
|
minTemperatur,
|
|
maxTemperatur,
|
|
ROUND(avgTemperatur, 2) as avgTemperatur
|
|
FROM tempmma_gs
|
|
WHERE stationId IS NOT NULL
|
|
AND jahr IS NOT NULL
|
|
AND bundesland IS NULL
|
|
ORDER BY jahr DESC, stationId ASC
|
|
""").show(limit, truncate=False)
|
|
|
|
# Filter: stationId muss NULL sein, jahr muss NULL sein
|
|
spark.sql("""
|
|
SELECT
|
|
monat,
|
|
bundesland,
|
|
minTemperatur,
|
|
maxTemperatur,
|
|
ROUND(avgTemperatur, 2) as avgTemperatur
|
|
FROM tempmma_gs
|
|
WHERE bundesland IS NOT NULL
|
|
AND monat IS NOT NULL
|
|
AND jahr IS NULL
|
|
ORDER BY monat ASC, bundesland ASC
|
|
""").show(limit, truncate=False)
|
|
|
|
def main(scon, spark):
|
|
read_parquet_tables(spark)
|
|
|
|
# Kempten ID = 2559
|
|
create_mma_rollup(spark, 2559)
|
|
for level in ["years", "quartals", "months", "days"]:
|
|
plot_date_values(spark, level)
|
|
|
|
create_tempmonat(spark)
|
|
print("Rangfolgen 2015:")
|
|
rank_temperatures(spark, 18, 2015)
|
|
print("Rangfolgen Gesamt:")
|
|
rank_temperatures(spark, 18)
|
|
|
|
create_grouping_sets_view(spark)
|
|
show_seperate_gs(spark, 10)
|
|
|
|
if __name__ == "__main__":
|
|
main(scon, spark) |