mirror of
https://github.com/Vale54321/BigData.git
synced 2025-12-11 09:59:33 +01:00
Compare commits
2 Commits
90524173f4
...
c072850289
| Author | SHA1 | Date | |
|---|---|---|---|
| c072850289 | |||
| 296d1c8978 |
186
Aufgabe 10/Aufgabe10.py
Normal file
186
Aufgabe 10/Aufgabe10.py
Normal file
@@ -0,0 +1,186 @@
|
||||
from sparkstart import scon, spark
|
||||
from pyspark.sql import SparkSession
|
||||
import time
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
HDFSPATH = "hdfs://193.174.205.250:54310/"
|
||||
|
||||
|
||||
def read_parquets(spark: SparkSession):
|
||||
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
|
||||
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
|
||||
|
||||
stations_df = spark.read.parquet(stations_path)
|
||||
stations_df.createOrReplaceTempView("german_stations")
|
||||
|
||||
products_df = spark.read.parquet(products_path)
|
||||
products_df.createOrReplaceTempView("german_stations_data")
|
||||
|
||||
stations_df.cache()
|
||||
products_df.cache()
|
||||
|
||||
|
||||
def plot_all_stations(spark: SparkSession):
|
||||
q = "SELECT geo_laenge AS lon, geo_breite AS lat FROM german_stations WHERE geo_laenge IS NOT NULL AND geo_breite IS NOT NULL"
|
||||
df = spark.sql(q)
|
||||
|
||||
pdf = df.toPandas()
|
||||
plt.figure(figsize=(8, 6))
|
||||
plt.scatter(pdf.lon, pdf.lat, s=6, color='red', marker='.')
|
||||
plt.xlabel('Longitude')
|
||||
plt.ylabel('Latitude')
|
||||
plt.title('All Stations (locations)')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
|
||||
def duration_circle_size(spark: SparkSession):
|
||||
q = (
|
||||
"SELECT stationId, geo_laenge AS lon, geo_breite AS lat, "
|
||||
"(CAST(SUBSTR(bis_datum,1,4) AS INT) - CAST(SUBSTR(von_datum,1,4) AS INT)) AS duration_years "
|
||||
"FROM german_stations "
|
||||
"WHERE TRIM(von_datum)<>'' AND TRIM(bis_datum)<>''"
|
||||
)
|
||||
df = spark.sql(q)
|
||||
|
||||
pdf = df.toPandas()
|
||||
|
||||
pdf['duration_years'] = pdf['duration_years'].fillna(0).astype(int)
|
||||
sizes = (pdf['duration_years'].clip(lower=0) + 1) * 6
|
||||
|
||||
plt.figure(figsize=(8, 6))
|
||||
plt.scatter(pdf.lon, pdf.lat, s=sizes, alpha=0.6, c=pdf['duration_years'], cmap='viridis')
|
||||
plt.colorbar(label='Duration (years)')
|
||||
plt.xlabel('Longitude')
|
||||
plt.ylabel('Latitude')
|
||||
plt.title('Stations with duration (years) as marker size')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
|
||||
def compute_daily_and_yearly_frosts(spark: SparkSession):
|
||||
q_daily_max = (
|
||||
"SELECT stationId, date, SUBSTR(date,1,4) AS year, MAX(TT_TU) AS max_temp "
|
||||
"FROM german_stations_data "
|
||||
"GROUP BY stationId, date"
|
||||
)
|
||||
daily_max = spark.sql(q_daily_max)
|
||||
daily_max.createOrReplaceTempView('daily_max')
|
||||
|
||||
# mark a day as frost if max_temp < 0
|
||||
q_daily_frost = (
|
||||
"SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost "
|
||||
"FROM daily_max"
|
||||
)
|
||||
daily_frost = spark.sql(q_daily_frost)
|
||||
daily_frost.createOrReplaceTempView('daily_frost')
|
||||
|
||||
# yearly frostdays per station
|
||||
q_station_year = (
|
||||
"SELECT stationId, year, SUM(is_frost) AS frost_days "
|
||||
"FROM daily_frost GROUP BY stationId, year"
|
||||
)
|
||||
station_year_frost = spark.sql(q_station_year)
|
||||
station_year_frost.createOrReplaceTempView('station_year_frost')
|
||||
|
||||
|
||||
def frost_analysis(spark: SparkSession, year=2024, station_name_matches=('kempten',)):
|
||||
compute_daily_and_yearly_frosts(spark)
|
||||
|
||||
q_hist = (
|
||||
f"SELECT frost_days, COUNT(*) AS station_count "
|
||||
f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days"
|
||||
)
|
||||
hist_df = spark.sql(q_hist)
|
||||
|
||||
hist_pdf = hist_df.toPandas()
|
||||
plt.figure(figsize=(8, 5))
|
||||
plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue')
|
||||
plt.xlabel('Number of Frost Days in year ' + str(year))
|
||||
plt.ylabel('Number of Stations')
|
||||
plt.title(f'Stations vs Frost Days ({year})')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
for name in station_name_matches:
|
||||
q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'"
|
||||
ids_df = spark.sql(q_find)
|
||||
ids = ids_df.collect()
|
||||
if not ids:
|
||||
print(f"No stations found matching '{name}'")
|
||||
continue
|
||||
for r in ids:
|
||||
sid = r['stationId']
|
||||
sname = r['station_name']
|
||||
print(f"Analyzing stationId={sid} name={sname}")
|
||||
|
||||
# compute frostdays + 5-yr and 20-yr rolling averages using window frame
|
||||
q_ts = (
|
||||
"SELECT year, frost_days, "
|
||||
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, "
|
||||
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 "
|
||||
f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)"
|
||||
)
|
||||
ts_df = spark.sql(q_ts)
|
||||
|
||||
pdf = ts_df.toPandas()
|
||||
if pdf.empty:
|
||||
print(f"No yearly frost data for station {sid}")
|
||||
continue
|
||||
|
||||
pdf['year'] = pdf['year'].astype(int)
|
||||
plt.figure(figsize=(10, 5))
|
||||
plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o')
|
||||
plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--')
|
||||
plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':')
|
||||
plt.xlabel('Year')
|
||||
plt.ylabel('Frost Days')
|
||||
plt.title(f'Frost Days over Years for {sname} (station {sid})')
|
||||
plt.legend()
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
|
||||
def height_frost_correlation(spark: SparkSession):
|
||||
compute_daily_and_yearly_frosts(spark)
|
||||
|
||||
q_corr = (
|
||||
"SELECT syf.year AS year, corr(s.hoehe, syf.frost_days) AS height_frost_corr "
|
||||
"FROM station_year_frost syf JOIN german_stations s ON syf.stationId = s.stationId "
|
||||
"GROUP BY syf.year ORDER BY CAST(syf.year AS INT)"
|
||||
)
|
||||
|
||||
corr_df = spark.sql(q_corr)
|
||||
|
||||
corr_pdf = corr_df.toPandas()
|
||||
|
||||
corr_pdf = corr_pdf.dropna(subset=['height_frost_corr'])
|
||||
if corr_pdf.empty:
|
||||
print("No non-NaN correlation values found.")
|
||||
return
|
||||
|
||||
corr_pdf['year'] = corr_pdf['year'].astype(int)
|
||||
plt.figure(figsize=(10, 5))
|
||||
plt.bar(corr_pdf.year, corr_pdf.height_frost_corr, color='orange')
|
||||
plt.xlabel('Year')
|
||||
plt.ylabel('Correlation (height vs frostdays)')
|
||||
plt.title('Yearly correlation: station height vs number of frost days')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
|
||||
def main(scon, spark):
|
||||
read_parquets(spark)
|
||||
|
||||
plot_all_stations(spark)
|
||||
|
||||
duration_circle_size(spark)
|
||||
|
||||
frost_analysis(spark, year=2024, station_name_matches=('kempten',))
|
||||
|
||||
height_frost_correlation(spark)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(scon, spark)
|
||||
|
||||
22
Aufgabe 10/sparkstart.py
Normal file
22
Aufgabe 10/sparkstart.py
Normal file
@@ -0,0 +1,22 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Erzeugen einer Spark-Konfiguration
|
||||
"""
|
||||
|
||||
from pyspark import SparkConf, SparkContext
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
# connect to cluster
|
||||
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
conf.set("spark.executor.memory", '32g')
|
||||
conf.set("spark.driver.memory", '8g')
|
||||
conf.set("spark.cores.max", "40")
|
||||
scon = SparkContext(conf=conf)
|
||||
|
||||
|
||||
spark = SparkSession \
|
||||
.builder \
|
||||
.appName("Python Spark SQL") \
|
||||
.getOrCreate()
|
||||
@@ -133,7 +133,7 @@ def plot_avg_tmax_day(station_name):
|
||||
|
||||
days = [row['day'] for row in df_avg]
|
||||
avg_tmax = [row['avg_tmax'] for row in df_avg]
|
||||
|
||||
#TODO: Mit SQL machen
|
||||
# 21-Tage gleitender Durchschnitt (10 Tage davor, Tag selbst, 10 Tage danach)
|
||||
rolling_avg = []
|
||||
for i in range(len(avg_tmax)):
|
||||
|
||||
167
Aufgabe 9/Aufgabe9.py
Normal file
167
Aufgabe 9/Aufgabe9.py
Normal file
@@ -0,0 +1,167 @@
|
||||
from sparkstart import scon, spark
|
||||
from pyspark import SparkContext, rdd
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql.types import StructType
|
||||
from pyspark.sql.types import StructField
|
||||
from pyspark.sql.types import StringType
|
||||
from pyspark.sql.types import FloatType
|
||||
from pyspark.sql.types import IntegerType
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
HDFSPATH = "hdfs://193.174.205.250:54310/"
|
||||
GHCNDPATH = HDFSPATH + "ghcnd/"
|
||||
GHCNDHOMEPATH = "/data/ghcnd/"
|
||||
|
||||
|
||||
# Aufgabe 9 a
|
||||
|
||||
def import_data(spark: SparkSession, scon: SparkContext):
|
||||
"""
|
||||
%time import_data(spark, scon)
|
||||
"""
|
||||
|
||||
# Daten in RDD einlesen
|
||||
rdd_station = scon.textFile("/data/cdc/hourly/TU_Stundenwerte_Beschreibung_Stationen.txt")
|
||||
|
||||
# Entfernen der ersten beiden Zeilen (Header und Trennzeile)
|
||||
rdd_station_filterd = (rdd_station
|
||||
.zipWithIndex() # jede Zeile bekommt idx
|
||||
.filter(lambda x: x[1] >= 2) # nur Zeilen mit idx >= 2 behalten
|
||||
.map(lambda x: x[0])) # idx wieder entfernen
|
||||
|
||||
|
||||
rdd_station_splitlines = rdd_station_filterd.map(
|
||||
lambda l: (
|
||||
int(l[:6].strip()), # Station ID
|
||||
l[6:15], # von_datum
|
||||
l[15:24], # bis_datum
|
||||
float(l[24:40].strip()), # stations höhe
|
||||
float(l[40:53].strip()), # geoBreite
|
||||
float(l[53:61].strip()), # geoHöhe
|
||||
l[61:142], # Stationsname
|
||||
l[142:-1] # Bundesland
|
||||
))
|
||||
|
||||
# Datenschema festlegen
|
||||
stationschema = StructType(
|
||||
[
|
||||
StructField("stationId", IntegerType(), True),
|
||||
StructField("von_datum", StringType(), True),
|
||||
StructField("bis_datum", StringType(), True),
|
||||
StructField("hoehe", FloatType(), True),
|
||||
StructField("geo_breite", FloatType(), True),
|
||||
StructField("geo_laenge", FloatType(), True),
|
||||
StructField("station_name", StringType(), True),
|
||||
StructField("bundesland", StringType(), True)
|
||||
]
|
||||
)
|
||||
|
||||
# Data Frame erzeugen
|
||||
stationframe = spark.createDataFrame(rdd_station_splitlines, schema=stationschema)
|
||||
stationframe.printSchema()
|
||||
|
||||
# Temporäre View erzeugen
|
||||
stationframe.createOrReplaceTempView("german_stations")
|
||||
|
||||
# Data Frame in HDFS speichern
|
||||
stationframe.write.mode("overwrite").parquet(
|
||||
HDFSPATH + "home/heiserervalentin/german_stations.parquet"
|
||||
)
|
||||
|
||||
|
||||
|
||||
def read_data_from_parquet(spark):
|
||||
"""
|
||||
read_data_from_parquet(spark)
|
||||
"""
|
||||
df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations.parquet")
|
||||
df.createOrReplaceTempView("german_stations")
|
||||
df.cache()
|
||||
|
||||
def sql_querys(spark):
|
||||
"""
|
||||
sql_querys(spark)
|
||||
"""
|
||||
spark.sql("SELECT * FROM german_stations").show(5, truncate=False)
|
||||
spark.sql("SELECT COUNT(*) AS Anzahl FROM german_stations").show()
|
||||
spark.sql("SELECT MAX(geo_breite) FROM german_stations").show()
|
||||
df = spark.sql("SELECT * FROM german_stations").toPandas()
|
||||
|
||||
plt.figure(figsize=[6,6])
|
||||
plt.scatter(df.geo_laenge, df.geo_breite, marker='.', color = 'r')
|
||||
|
||||
plt.show()
|
||||
|
||||
|
||||
def import_produkt_files(spark: SparkSession, scon: SparkContext, path='/data/cdc/hourly/'):
|
||||
"""
|
||||
import_produkt_files(spark, scon)
|
||||
"""
|
||||
|
||||
# Daten in RDD einlesen
|
||||
rdd_produkt = scon.textFile(f"{path}/produkt*")
|
||||
|
||||
# Kopfzeile und Leerzeichen filtern
|
||||
rdd_filterd = rdd_produkt \
|
||||
.filter(lambda l: l != 'STATIONS_ID;MESS_DATUM;QN_9;TT_TU;RF_TU;eor') \
|
||||
.map(lambda l: [x.strip() for x in l.split(';')])
|
||||
|
||||
# Zeilen in Felder aufteilen
|
||||
rdd_produkt_splitlines = rdd_filterd.map(
|
||||
lambda l: (
|
||||
int(l[0]), # Stat_id
|
||||
l[1][:8], # Messdatum
|
||||
int(l[1][8:10]), # Messstunde
|
||||
int(l[2]), # Qualitätsniveau
|
||||
float(l[3]), # Lufttemp.
|
||||
float(l[4]), # rel. Luftfeuchte
|
||||
int(l[1][0:4]) # jahr
|
||||
)
|
||||
)
|
||||
|
||||
print(rdd_produkt_splitlines.take(5))
|
||||
|
||||
# Datenschema definieren
|
||||
product_schema = StructType(
|
||||
[
|
||||
StructField("stationId", IntegerType(), True),
|
||||
StructField("date", StringType(), True),
|
||||
StructField("hour", IntegerType(), True),
|
||||
StructField("QN_9", IntegerType(), True),
|
||||
StructField("TT_TU", FloatType(), True),
|
||||
StructField("RF_TU", FloatType(), True),
|
||||
StructField("jahr", IntegerType(), True)
|
||||
]
|
||||
)
|
||||
|
||||
product_frame = spark.createDataFrame(rdd_produkt_splitlines, schema=product_schema)
|
||||
product_frame.printSchema()
|
||||
product_frame.createOrReplaceTempView("german_stations_data")
|
||||
|
||||
|
||||
product_frame.write.mode("overwrite").parquet(
|
||||
HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
|
||||
)
|
||||
|
||||
|
||||
|
||||
def read_product_data_from_parquet(spark):
|
||||
"""
|
||||
read_product_data_from_parquet(spark)
|
||||
"""
|
||||
df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations_data.parquet")
|
||||
df.createOrReplaceTempView("german_stations_data")
|
||||
df.cache()
|
||||
|
||||
def main(scon, spark):
|
||||
# Daten importieren
|
||||
import_data(spark, scon)
|
||||
read_data_from_parquet(spark)
|
||||
sql_querys(spark)
|
||||
|
||||
import_produkt_files(spark, scon)
|
||||
read_product_data_from_parquet(spark)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(scon, spark)
|
||||
21
Aufgabe 9/sparkstart.py
Normal file
21
Aufgabe 9/sparkstart.py
Normal file
@@ -0,0 +1,21 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Erzeugen einer Spark-Konfiguration
|
||||
"""
|
||||
|
||||
from pyspark import SparkConf, SparkContext
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
# connect to cluster
|
||||
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
conf.set("spark.executor.memory", '32g')
|
||||
conf.set("spark.driver.memory", '8g')
|
||||
conf.set("spark.cores.max", "40")
|
||||
scon = SparkContext(conf=conf)
|
||||
|
||||
spark = SparkSession \
|
||||
.builder \
|
||||
.appName("Python Spark SQL") \
|
||||
.getOrCreate()
|
||||
Reference in New Issue
Block a user