Compare commits

...

2 Commits

Author SHA1 Message Date
c072850289 bla 2025-11-28 14:43:37 +01:00
296d1c8978 add 9 + 10 2025-11-28 13:23:06 +01:00
5 changed files with 397 additions and 1 deletions

186
Aufgabe 10/Aufgabe10.py Normal file
View File

@@ -0,0 +1,186 @@
from sparkstart import scon, spark
from pyspark.sql import SparkSession
import time
import matplotlib.pyplot as plt
HDFSPATH = "hdfs://193.174.205.250:54310/"
def read_parquets(spark: SparkSession):
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
stations_df = spark.read.parquet(stations_path)
stations_df.createOrReplaceTempView("german_stations")
products_df = spark.read.parquet(products_path)
products_df.createOrReplaceTempView("german_stations_data")
stations_df.cache()
products_df.cache()
def plot_all_stations(spark: SparkSession):
q = "SELECT geo_laenge AS lon, geo_breite AS lat FROM german_stations WHERE geo_laenge IS NOT NULL AND geo_breite IS NOT NULL"
df = spark.sql(q)
pdf = df.toPandas()
plt.figure(figsize=(8, 6))
plt.scatter(pdf.lon, pdf.lat, s=6, color='red', marker='.')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('All Stations (locations)')
plt.tight_layout()
plt.show()
def duration_circle_size(spark: SparkSession):
q = (
"SELECT stationId, geo_laenge AS lon, geo_breite AS lat, "
"(CAST(SUBSTR(bis_datum,1,4) AS INT) - CAST(SUBSTR(von_datum,1,4) AS INT)) AS duration_years "
"FROM german_stations "
"WHERE TRIM(von_datum)<>'' AND TRIM(bis_datum)<>''"
)
df = spark.sql(q)
pdf = df.toPandas()
pdf['duration_years'] = pdf['duration_years'].fillna(0).astype(int)
sizes = (pdf['duration_years'].clip(lower=0) + 1) * 6
plt.figure(figsize=(8, 6))
plt.scatter(pdf.lon, pdf.lat, s=sizes, alpha=0.6, c=pdf['duration_years'], cmap='viridis')
plt.colorbar(label='Duration (years)')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('Stations with duration (years) as marker size')
plt.tight_layout()
plt.show()
def compute_daily_and_yearly_frosts(spark: SparkSession):
q_daily_max = (
"SELECT stationId, date, SUBSTR(date,1,4) AS year, MAX(TT_TU) AS max_temp "
"FROM german_stations_data "
"GROUP BY stationId, date"
)
daily_max = spark.sql(q_daily_max)
daily_max.createOrReplaceTempView('daily_max')
# mark a day as frost if max_temp < 0
q_daily_frost = (
"SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost "
"FROM daily_max"
)
daily_frost = spark.sql(q_daily_frost)
daily_frost.createOrReplaceTempView('daily_frost')
# yearly frostdays per station
q_station_year = (
"SELECT stationId, year, SUM(is_frost) AS frost_days "
"FROM daily_frost GROUP BY stationId, year"
)
station_year_frost = spark.sql(q_station_year)
station_year_frost.createOrReplaceTempView('station_year_frost')
def frost_analysis(spark: SparkSession, year=2024, station_name_matches=('kempten',)):
compute_daily_and_yearly_frosts(spark)
q_hist = (
f"SELECT frost_days, COUNT(*) AS station_count "
f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days"
)
hist_df = spark.sql(q_hist)
hist_pdf = hist_df.toPandas()
plt.figure(figsize=(8, 5))
plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue')
plt.xlabel('Number of Frost Days in year ' + str(year))
plt.ylabel('Number of Stations')
plt.title(f'Stations vs Frost Days ({year})')
plt.tight_layout()
plt.show()
for name in station_name_matches:
q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'"
ids_df = spark.sql(q_find)
ids = ids_df.collect()
if not ids:
print(f"No stations found matching '{name}'")
continue
for r in ids:
sid = r['stationId']
sname = r['station_name']
print(f"Analyzing stationId={sid} name={sname}")
# compute frostdays + 5-yr and 20-yr rolling averages using window frame
q_ts = (
"SELECT year, frost_days, "
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, "
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 "
f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)"
)
ts_df = spark.sql(q_ts)
pdf = ts_df.toPandas()
if pdf.empty:
print(f"No yearly frost data for station {sid}")
continue
pdf['year'] = pdf['year'].astype(int)
plt.figure(figsize=(10, 5))
plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o')
plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--')
plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':')
plt.xlabel('Year')
plt.ylabel('Frost Days')
plt.title(f'Frost Days over Years for {sname} (station {sid})')
plt.legend()
plt.tight_layout()
plt.show()
def height_frost_correlation(spark: SparkSession):
compute_daily_and_yearly_frosts(spark)
q_corr = (
"SELECT syf.year AS year, corr(s.hoehe, syf.frost_days) AS height_frost_corr "
"FROM station_year_frost syf JOIN german_stations s ON syf.stationId = s.stationId "
"GROUP BY syf.year ORDER BY CAST(syf.year AS INT)"
)
corr_df = spark.sql(q_corr)
corr_pdf = corr_df.toPandas()
corr_pdf = corr_pdf.dropna(subset=['height_frost_corr'])
if corr_pdf.empty:
print("No non-NaN correlation values found.")
return
corr_pdf['year'] = corr_pdf['year'].astype(int)
plt.figure(figsize=(10, 5))
plt.bar(corr_pdf.year, corr_pdf.height_frost_corr, color='orange')
plt.xlabel('Year')
plt.ylabel('Correlation (height vs frostdays)')
plt.title('Yearly correlation: station height vs number of frost days')
plt.tight_layout()
plt.show()
def main(scon, spark):
read_parquets(spark)
plot_all_stations(spark)
duration_circle_size(spark)
frost_analysis(spark, year=2024, station_name_matches=('kempten',))
height_frost_correlation(spark)
if __name__ == '__main__':
main(scon, spark)

22
Aufgabe 10/sparkstart.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()

View File

@@ -133,7 +133,7 @@ def plot_avg_tmax_day(station_name):
days = [row['day'] for row in df_avg]
avg_tmax = [row['avg_tmax'] for row in df_avg]
#TODO: Mit SQL machen
# 21-Tage gleitender Durchschnitt (10 Tage davor, Tag selbst, 10 Tage danach)
rolling_avg = []
for i in range(len(avg_tmax)):

167
Aufgabe 9/Aufgabe9.py Normal file
View File

@@ -0,0 +1,167 @@
from sparkstart import scon, spark
from pyspark import SparkContext, rdd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
import matplotlib.pyplot as plt
HDFSPATH = "hdfs://193.174.205.250:54310/"
GHCNDPATH = HDFSPATH + "ghcnd/"
GHCNDHOMEPATH = "/data/ghcnd/"
# Aufgabe 9 a
def import_data(spark: SparkSession, scon: SparkContext):
"""
%time import_data(spark, scon)
"""
# Daten in RDD einlesen
rdd_station = scon.textFile("/data/cdc/hourly/TU_Stundenwerte_Beschreibung_Stationen.txt")
# Entfernen der ersten beiden Zeilen (Header und Trennzeile)
rdd_station_filterd = (rdd_station
.zipWithIndex() # jede Zeile bekommt idx
.filter(lambda x: x[1] >= 2) # nur Zeilen mit idx >= 2 behalten
.map(lambda x: x[0])) # idx wieder entfernen
rdd_station_splitlines = rdd_station_filterd.map(
lambda l: (
int(l[:6].strip()), # Station ID
l[6:15], # von_datum
l[15:24], # bis_datum
float(l[24:40].strip()), # stations höhe
float(l[40:53].strip()), # geoBreite
float(l[53:61].strip()), # geoHöhe
l[61:142], # Stationsname
l[142:-1] # Bundesland
))
# Datenschema festlegen
stationschema = StructType(
[
StructField("stationId", IntegerType(), True),
StructField("von_datum", StringType(), True),
StructField("bis_datum", StringType(), True),
StructField("hoehe", FloatType(), True),
StructField("geo_breite", FloatType(), True),
StructField("geo_laenge", FloatType(), True),
StructField("station_name", StringType(), True),
StructField("bundesland", StringType(), True)
]
)
# Data Frame erzeugen
stationframe = spark.createDataFrame(rdd_station_splitlines, schema=stationschema)
stationframe.printSchema()
# Temporäre View erzeugen
stationframe.createOrReplaceTempView("german_stations")
# Data Frame in HDFS speichern
stationframe.write.mode("overwrite").parquet(
HDFSPATH + "home/heiserervalentin/german_stations.parquet"
)
def read_data_from_parquet(spark):
"""
read_data_from_parquet(spark)
"""
df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations.parquet")
df.createOrReplaceTempView("german_stations")
df.cache()
def sql_querys(spark):
"""
sql_querys(spark)
"""
spark.sql("SELECT * FROM german_stations").show(5, truncate=False)
spark.sql("SELECT COUNT(*) AS Anzahl FROM german_stations").show()
spark.sql("SELECT MAX(geo_breite) FROM german_stations").show()
df = spark.sql("SELECT * FROM german_stations").toPandas()
plt.figure(figsize=[6,6])
plt.scatter(df.geo_laenge, df.geo_breite, marker='.', color = 'r')
plt.show()
def import_produkt_files(spark: SparkSession, scon: SparkContext, path='/data/cdc/hourly/'):
"""
import_produkt_files(spark, scon)
"""
# Daten in RDD einlesen
rdd_produkt = scon.textFile(f"{path}/produkt*")
# Kopfzeile und Leerzeichen filtern
rdd_filterd = rdd_produkt \
.filter(lambda l: l != 'STATIONS_ID;MESS_DATUM;QN_9;TT_TU;RF_TU;eor') \
.map(lambda l: [x.strip() for x in l.split(';')])
# Zeilen in Felder aufteilen
rdd_produkt_splitlines = rdd_filterd.map(
lambda l: (
int(l[0]), # Stat_id
l[1][:8], # Messdatum
int(l[1][8:10]), # Messstunde
int(l[2]), # Qualitätsniveau
float(l[3]), # Lufttemp.
float(l[4]), # rel. Luftfeuchte
int(l[1][0:4]) # jahr
)
)
print(rdd_produkt_splitlines.take(5))
# Datenschema definieren
product_schema = StructType(
[
StructField("stationId", IntegerType(), True),
StructField("date", StringType(), True),
StructField("hour", IntegerType(), True),
StructField("QN_9", IntegerType(), True),
StructField("TT_TU", FloatType(), True),
StructField("RF_TU", FloatType(), True),
StructField("jahr", IntegerType(), True)
]
)
product_frame = spark.createDataFrame(rdd_produkt_splitlines, schema=product_schema)
product_frame.printSchema()
product_frame.createOrReplaceTempView("german_stations_data")
product_frame.write.mode("overwrite").parquet(
HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
)
def read_product_data_from_parquet(spark):
"""
read_product_data_from_parquet(spark)
"""
df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations_data.parquet")
df.createOrReplaceTempView("german_stations_data")
df.cache()
def main(scon, spark):
# Daten importieren
import_data(spark, scon)
read_data_from_parquet(spark)
sql_querys(spark)
import_produkt_files(spark, scon)
read_product_data_from_parquet(spark)
if __name__ == "__main__":
main(scon, spark)

21
Aufgabe 9/sparkstart.py Normal file
View File

@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()