mirror of
https://github.com/Vale54321/BigData.git
synced 2025-12-13 02:49:32 +01:00
Compare commits
7 Commits
90524173f4
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 622a228fb7 | |||
| d18e9823e5 | |||
| 2b26188647 | |||
| f89d39d420 | |||
| de3782d570 | |||
| c072850289 | |||
| 296d1c8978 |
@@ -1,209 +1,195 @@
|
||||
from sparkstart import scon, spark
|
||||
import ghcnd_stations
|
||||
import matplotlib.pyplot as plt
|
||||
from pyspark.sql import SparkSession
|
||||
import time
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
# a) Scatterplot: alle Stationen (lon/lat)
|
||||
def plot_all_stations(spark):
|
||||
q = """
|
||||
SELECT stationname, latitude, longitude
|
||||
FROM cdc_stations
|
||||
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
|
||||
"""
|
||||
t0 = time.time()
|
||||
rows = spark.sql(q).collect()
|
||||
t1 = time.time()
|
||||
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}")
|
||||
HDFSPATH = "hdfs://193.174.205.250:54310/"
|
||||
|
||||
lats = [r['latitude'] for r in rows]
|
||||
lons = [r['longitude'] for r in rows]
|
||||
names = [r['stationname'] for r in rows]
|
||||
def read_parquets(spark: SparkSession):
|
||||
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
|
||||
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
|
||||
|
||||
plt.figure(figsize=(8,6))
|
||||
plt.scatter(lons, lats, s=10, alpha=0.6)
|
||||
plt.xlabel('Longitude')
|
||||
plt.ylabel('Latitude')
|
||||
plt.title('Alle CDC-Stationen (Scatter)')
|
||||
plt.grid(True)
|
||||
stations_df = spark.read.parquet(stations_path)
|
||||
stations_df.createOrReplaceTempView("german_stations")
|
||||
|
||||
products_df = spark.read.parquet(products_path)
|
||||
products_df.createOrReplaceTempView("german_stations_data")
|
||||
|
||||
stations_df.cache()
|
||||
products_df.cache()
|
||||
|
||||
|
||||
def plot_all_stations(spark: SparkSession):
|
||||
q = "SELECT geo_laenge AS lon, geo_breite AS lat FROM german_stations WHERE geo_laenge IS NOT NULL AND geo_breite IS NOT NULL"
|
||||
df = spark.sql(q)
|
||||
|
||||
pdf = df.toPandas()
|
||||
plt.figure(figsize=(8, 6))
|
||||
plt.scatter(pdf.lon, pdf.lat, s=6, color='red', marker='.')
|
||||
plt.xlabel('Longitude')
|
||||
plt.ylabel('Latitude')
|
||||
plt.title('All Stations (locations)')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
|
||||
def duration_circle_size(spark: SparkSession):
|
||||
q = (
|
||||
"SELECT stationId, geo_laenge AS lon, geo_breite AS lat, "
|
||||
"(CAST(SUBSTR(bis_datum,1,4) AS INT) - CAST(SUBSTR(von_datum,1,4) AS INT)) AS duration_years "
|
||||
"FROM german_stations "
|
||||
"WHERE TRIM(von_datum)<>'' AND TRIM(bis_datum)<>''"
|
||||
)
|
||||
df = spark.sql(q)
|
||||
|
||||
pdf = df.toPandas()
|
||||
|
||||
pdf['duration_years'] = pdf['duration_years'].fillna(0).astype(int)
|
||||
sizes = (pdf['duration_years'].clip(lower=0) + 1) * 6
|
||||
|
||||
plt.figure(figsize=(8, 6))
|
||||
plt.scatter(pdf.lon, pdf.lat, s=sizes, alpha=0.6, c=pdf['duration_years'], cmap='viridis')
|
||||
plt.colorbar(label='Duration (years)')
|
||||
plt.xlabel('Longitude')
|
||||
plt.ylabel('Latitude')
|
||||
plt.title('Stations with duration (years) as marker size')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
|
||||
def compute_daily_and_yearly_frosts(spark: SparkSession):
|
||||
q_daily_max = (
|
||||
"SELECT stationId, date, SUBSTR(CAST(date AS STRING),1,4) AS year, MAX(TT_TU) AS max_temp "
|
||||
"FROM german_stations_data "
|
||||
"WHERE TT_TU IS NOT NULL AND TT_TU > -50 AND TT_TU < 60 "
|
||||
"GROUP BY stationId, date"
|
||||
)
|
||||
daily_max = spark.sql(q_daily_max)
|
||||
daily_max.createOrReplaceTempView('daily_max')
|
||||
|
||||
# mark a day as frost if max_temp < 0
|
||||
q_daily_frost = (
|
||||
"SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost "
|
||||
"FROM daily_max"
|
||||
)
|
||||
daily_frost = spark.sql(q_daily_frost)
|
||||
daily_frost.createOrReplaceTempView('daily_frost')
|
||||
|
||||
# yearly frostdays per station
|
||||
q_station_year = (
|
||||
"SELECT stationId, year, SUM(is_frost) AS frost_days "
|
||||
"FROM daily_frost GROUP BY stationId, year"
|
||||
)
|
||||
station_year_frost = spark.sql(q_station_year)
|
||||
station_year_frost.createOrReplaceTempView('station_year_frost')
|
||||
|
||||
|
||||
def frost_analysis(spark: SparkSession, year=2024, station_name_matches=('kempten',)):
|
||||
compute_daily_and_yearly_frosts(spark)
|
||||
|
||||
q_hist = (
|
||||
f"SELECT frost_days, COUNT(*) AS station_count "
|
||||
f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days"
|
||||
)
|
||||
hist_df = spark.sql(q_hist)
|
||||
|
||||
hist_pdf = hist_df.toPandas()
|
||||
|
||||
if hist_pdf.empty:
|
||||
print(f"No frost data found for year {year}. Trying to find available years...")
|
||||
q_all = "SELECT frost_days, COUNT(*) AS station_count FROM station_year_frost GROUP BY frost_days ORDER BY frost_days"
|
||||
hist_pdf = spark.sql(q_all).toPandas()
|
||||
if hist_pdf.empty:
|
||||
print("No frost data available at all. Check if TT_TU column contains valid temperature data.")
|
||||
return
|
||||
print(f"Found {len(hist_pdf)} frost day categories across all years")
|
||||
|
||||
plt.figure(figsize=(8, 5))
|
||||
plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue')
|
||||
plt.xlabel('Number of Frost Days in year ' + str(year))
|
||||
plt.ylabel('Number of Stations')
|
||||
plt.title(f'Stations vs Frost Days ({year})')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
for name in station_name_matches:
|
||||
q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'"
|
||||
ids_df = spark.sql(q_find)
|
||||
ids = ids_df.collect()
|
||||
if not ids:
|
||||
print(f"No stations found matching '{name}'")
|
||||
continue
|
||||
for r in ids:
|
||||
sid = r['stationId']
|
||||
sname = r['station_name']
|
||||
print(f"Analyzing stationId={sid} name={sname}")
|
||||
|
||||
# b) Scatterplot: Stationsdauer in Jahren als Marker-Size
|
||||
def plot_station_duration(spark, size_factor=20):
|
||||
q = """
|
||||
SELECT
|
||||
stationname,
|
||||
latitude,
|
||||
longitude,
|
||||
(CAST(CASE WHEN length(to_date) >= 4 THEN substr(to_date,1,4) ELSE year(current_date()) END AS INT)
|
||||
- CAST(substr(from_date,1,4) AS INT)) AS years
|
||||
FROM cdc_stations
|
||||
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
|
||||
"""
|
||||
t0 = time.time()
|
||||
rows = spark.sql(q).collect()
|
||||
t1 = time.time()
|
||||
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}")
|
||||
q_ts = (
|
||||
"SELECT year, frost_days, "
|
||||
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, "
|
||||
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 "
|
||||
f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)"
|
||||
)
|
||||
ts_df = spark.sql(q_ts)
|
||||
|
||||
lats = [r['latitude'] for r in rows]
|
||||
lons = [r['longitude'] for r in rows]
|
||||
years = [r['years'] if r['years'] is not None else 0 for r in rows]
|
||||
sizes = [max(5, (y+1) * size_factor) for y in years]
|
||||
pdf = ts_df.toPandas()
|
||||
if pdf.empty:
|
||||
print(f"No yearly frost data for station {sid}")
|
||||
continue
|
||||
|
||||
plt.figure(figsize=(8,6))
|
||||
plt.scatter(lons, lats, s=sizes, alpha=0.6)
|
||||
plt.xlabel('Longitude')
|
||||
plt.ylabel('Latitude')
|
||||
plt.title('CDC-Stationen: Dauer der Verfuegbarkeit (Größe ~ Jahre)')
|
||||
plt.grid(True)
|
||||
plt.show()
|
||||
|
||||
def plot_frost_distribution_year(spark, year):
|
||||
q = f"""
|
||||
WITH daily_max AS (
|
||||
SELECT stationid, date, MAX(tt_tu) AS max_temp
|
||||
FROM cdc_hourly
|
||||
WHERE length(date) >= 8 AND substr(date,1,4) = '{year}'
|
||||
GROUP BY stationid, date
|
||||
),
|
||||
station_frost AS (
|
||||
SELECT dm.stationid, SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
|
||||
FROM daily_max dm
|
||||
GROUP BY dm.stationid
|
||||
)
|
||||
SELECT sf.frostdays, COUNT(*) AS stations
|
||||
FROM station_frost sf
|
||||
GROUP BY sf.frostdays
|
||||
ORDER BY sf.frostdays
|
||||
"""
|
||||
t0 = time.time()
|
||||
rows = spark.sql(q).collect()
|
||||
t1 = time.time()
|
||||
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Distinct frostdays: {len(rows)}")
|
||||
|
||||
x = [r['frostdays'] for r in rows]
|
||||
y = [r['stations'] for r in rows]
|
||||
|
||||
plt.figure(figsize=(8,5))
|
||||
plt.bar(x, y)
|
||||
plt.xlabel('Anzahl Frosttage im Jahr ' + str(year))
|
||||
plt.ylabel('Anzahl Stationen')
|
||||
plt.title(f'Verteilung der Frosttage pro Station im Jahr {year}')
|
||||
plt.grid(True)
|
||||
plt.show()
|
||||
pdf['year'] = pdf['year'].astype(int)
|
||||
plt.figure(figsize=(10, 5))
|
||||
plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o')
|
||||
plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--')
|
||||
plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':')
|
||||
plt.xlabel('Year')
|
||||
plt.ylabel('Frost Days')
|
||||
plt.title(f'Frost Days over Years for {sname} (station {sid})')
|
||||
plt.legend()
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
|
||||
# c2) Frosttage Zeitreihe für eine Station mit 5- und 20-Jahres Durchschnitt (SQL window)
|
||||
def plot_station_frost_timeseries(spark, station_name):
|
||||
q = f"""
|
||||
WITH daily_max AS (
|
||||
SELECT stationid, date, MAX(tt_tu) AS max_temp
|
||||
FROM cdc_hourly
|
||||
GROUP BY stationid, date
|
||||
),
|
||||
yearly AS (
|
||||
SELECT
|
||||
dm.stationid,
|
||||
CAST(substr(dm.date,1,4) AS INT) AS year,
|
||||
SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
|
||||
FROM daily_max dm
|
||||
GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT)
|
||||
),
|
||||
station_yearly AS (
|
||||
SELECT
|
||||
y.year,
|
||||
y.frostdays,
|
||||
AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg5,
|
||||
AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg20
|
||||
FROM yearly y
|
||||
JOIN cdc_stations s ON y.stationid = s.stationid
|
||||
WHERE trim(upper(s.stationname)) = '{station_name.upper()}'
|
||||
ORDER BY y.year
|
||||
)
|
||||
SELECT * FROM station_yearly
|
||||
"""
|
||||
t0 = time.time()
|
||||
rows = spark.sql(q).collect()
|
||||
t1 = time.time()
|
||||
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years: {len(rows)}")
|
||||
def height_frost_correlation(spark: SparkSession):
|
||||
compute_daily_and_yearly_frosts(spark)
|
||||
|
||||
if not rows:
|
||||
print(f"Keine Daten f\u00fcr Station '{station_name}'.")
|
||||
return
|
||||
q_corr = (
|
||||
"SELECT syf.year AS year, corr(s.hoehe, syf.frost_days) AS height_frost_corr "
|
||||
"FROM station_year_frost syf JOIN german_stations s ON syf.stationId = s.stationId "
|
||||
"GROUP BY syf.year ORDER BY CAST(syf.year AS INT)"
|
||||
)
|
||||
|
||||
corr_df = spark.sql(q_corr)
|
||||
|
||||
years = [r['year'] for r in rows]
|
||||
frostdays = [r['frostdays'] for r in rows]
|
||||
avg5 = [r['avg5'] for r in rows]
|
||||
avg20 = [r['avg20'] for r in rows]
|
||||
corr_pdf = corr_df.toPandas()
|
||||
|
||||
plt.figure(figsize=(10,5))
|
||||
plt.plot(years, frostdays, label='Frosttage (Jahr)')
|
||||
plt.plot(years, avg5, label='5-Jahres-Durchschnitt')
|
||||
plt.plot(years, avg20, label='20-Jahres-Durchschnitt')
|
||||
plt.xlabel('Jahr')
|
||||
plt.ylabel('Anzahl Frosttage')
|
||||
plt.title(f'Frosttage f\u00fcr Station {station_name}')
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
plt.show()
|
||||
corr_pdf = corr_pdf.dropna(subset=['height_frost_corr'])
|
||||
if corr_pdf.empty:
|
||||
print("No non-NaN correlation values found.")
|
||||
return
|
||||
|
||||
corr_pdf['year'] = corr_pdf['year'].astype(int)
|
||||
plt.figure(figsize=(10, 5))
|
||||
plt.bar(corr_pdf.year, corr_pdf.height_frost_corr, color='orange')
|
||||
plt.xlabel('Year')
|
||||
plt.ylabel('Correlation (height vs frostdays)')
|
||||
plt.title('Yearly correlation: station height vs number of frost days')
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
|
||||
# d) Korrelation Hoehe vs. Frosttage pro Jahr
|
||||
def plot_height_frost_correlation(spark):
|
||||
q = """
|
||||
WITH daily_max AS (
|
||||
SELECT stationid, date, MAX(tt_tu) AS max_temp
|
||||
FROM cdc_hourly
|
||||
GROUP BY stationid, date
|
||||
),
|
||||
yearly AS (
|
||||
SELECT
|
||||
dm.stationid,
|
||||
CAST(substr(dm.date,1,4) AS INT) AS year,
|
||||
SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
|
||||
FROM daily_max dm
|
||||
GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT)
|
||||
),
|
||||
joined AS (
|
||||
SELECT y.year, s.height, y.frostdays
|
||||
FROM yearly y
|
||||
JOIN cdc_stations s ON y.stationid = s.stationid
|
||||
),
|
||||
yearly_corr AS (
|
||||
SELECT year, corr(height, frostdays) AS corr
|
||||
FROM joined
|
||||
GROUP BY year
|
||||
ORDER BY year
|
||||
)
|
||||
SELECT year, corr FROM yearly_corr WHERE corr IS NOT NULL
|
||||
"""
|
||||
t0 = time.time()
|
||||
rows = spark.sql(q).collect()
|
||||
t1 = time.time()
|
||||
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years with corr: {len(rows)}")
|
||||
def main(scon, spark):
|
||||
read_parquets(spark)
|
||||
|
||||
if not rows:
|
||||
print("Keine Korrelationsdaten verfügbar.")
|
||||
return
|
||||
plot_all_stations(spark)
|
||||
|
||||
years = [r['year'] for r in rows]
|
||||
corr = [r['corr'] for r in rows]
|
||||
duration_circle_size(spark)
|
||||
|
||||
plt.figure(figsize=(10,5))
|
||||
plt.bar(years, corr)
|
||||
plt.xlabel('Jahr')
|
||||
plt.ylabel('Korrelationskoeffizient (height vs frostdays)')
|
||||
plt.title('Korrelation Hoehe vs. Frosttage pro Jahr')
|
||||
plt.grid(True)
|
||||
plt.show()
|
||||
frost_analysis(spark, year=2024, station_name_matches=('kempten',))
|
||||
|
||||
height_frost_correlation(spark)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
ghcnd_stations.read_ghcnd_from_parquet(spark)
|
||||
main(scon, spark)
|
||||
|
||||
plot_all_stations(spark)
|
||||
plot_station_duration(spark)
|
||||
plot_frost_distribution_year(spark, '2010')
|
||||
plot_station_frost_timeseries(spark, 'KEMPTEN')
|
||||
plot_height_frost_correlation(spark)
|
||||
pass
|
||||
|
||||
@@ -1,689 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Load stations, countries, inventory and data from GHCND as Dataset.
|
||||
|
||||
@author: steger
|
||||
|
||||
"""
|
||||
|
||||
# pylint: disable=pointless-string-statement
|
||||
|
||||
import os
|
||||
from datetime import date
|
||||
from time import time
|
||||
from subprocess import call
|
||||
from pyspark.sql.types import StructType
|
||||
from pyspark.sql.types import StructField
|
||||
from pyspark.sql.types import StringType
|
||||
from pyspark.sql.types import FloatType
|
||||
from pyspark.sql.types import IntegerType
|
||||
from pyspark.sql.types import DateType
|
||||
|
||||
|
||||
# =============================================
|
||||
# run sparkstart.py before to create a session
|
||||
# =============================================
|
||||
|
||||
HDFSPATH = "hdfs://193.174.205.250:54310/"
|
||||
GHCNDPATH = HDFSPATH + "ghcnd/"
|
||||
GHCNDHOMEPATH = "/data/ghcnd/"
|
||||
|
||||
|
||||
def conv_elevation(elev):
|
||||
"""
|
||||
Convert an elevation value.
|
||||
|
||||
-999.9 means there is no value.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
elev : string
|
||||
The elevation to convert to float.
|
||||
|
||||
Returns
|
||||
-------
|
||||
res : numeric
|
||||
The converted value as float.
|
||||
"""
|
||||
elev = elev.strip()
|
||||
if elev == "-999.9":
|
||||
res = None
|
||||
else:
|
||||
res = float(elev)
|
||||
return res
|
||||
|
||||
|
||||
def conv_data_value(line, start):
|
||||
"""
|
||||
Convert a single data value from a dly.- File.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
line : string
|
||||
The line with the data value.
|
||||
start : int
|
||||
The index at which the value starts.
|
||||
|
||||
Returns
|
||||
-------
|
||||
res : numeric
|
||||
The onverted data value as int.
|
||||
"""
|
||||
return int(line[start:start+5].strip())
|
||||
|
||||
|
||||
def import_ghcnd_stations(scon, spark, path):
|
||||
"""
|
||||
Read the station data into a dataframe.
|
||||
|
||||
Register it as temporary view and write it to parquet.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The spark context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
stationFrame : DataFrame
|
||||
The spark Data Frame with the stations data.
|
||||
"""
|
||||
stationlines = scon.textFile(path + "ghcnd-stations.txt")
|
||||
stationsplitlines = stationlines.map(
|
||||
lambda l:
|
||||
(l[0:2],
|
||||
l[2:3],
|
||||
l[0:11],
|
||||
float(l[12:20].strip()),
|
||||
float(l[21:30].strip()),
|
||||
conv_elevation(l[31:37]),
|
||||
l[41:71]
|
||||
))
|
||||
stationschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('networkcode', StringType(), True),
|
||||
StructField('stationid', StringType(), True),
|
||||
StructField('latitude', FloatType(), True),
|
||||
StructField('longitude', FloatType(), True),
|
||||
StructField('elevation', FloatType(), True),
|
||||
StructField('stationname', StringType(), True)
|
||||
])
|
||||
stationframe = spark.createDataFrame(stationsplitlines,
|
||||
schema=stationschema)
|
||||
stationframe.createOrReplaceTempView("ghcndstations")
|
||||
stationframe.write.mode('overwrite').parquet(
|
||||
GHCNDPATH + "ghcndstations.parquet")
|
||||
stationframe.cache()
|
||||
print("Imported GhcndStations")
|
||||
return stationframe
|
||||
|
||||
|
||||
def import_ghcnd_countries(scon, spark, path):
|
||||
"""
|
||||
Read the countries data into a dataframe.
|
||||
|
||||
Register it as temptable and write it to parquet.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The spark context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
path : string
|
||||
The path where the file with data resides.
|
||||
|
||||
Returns
|
||||
-------
|
||||
stationFrame : DataFrame
|
||||
The spark Data Frame with the countries data.
|
||||
"""
|
||||
countrylines = scon.textFile(path + "ghcnd-countries.txt")
|
||||
countrysplitlines = countrylines.map(lambda l: (l[0:2], l[2:50]))
|
||||
countryschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('countryname', StringType(), True)])
|
||||
countryframe = spark.createDataFrame(countrysplitlines, countryschema)
|
||||
countryframe.createOrReplaceTempView("ghcndcountries")
|
||||
countryframe.write.mode('overwrite').parquet(
|
||||
GHCNDPATH + "ghcndcountries.parquet")
|
||||
countryframe.cache()
|
||||
print("Imported GhcndCountries")
|
||||
return countryframe
|
||||
|
||||
|
||||
def conv_data_line(line):
|
||||
"""
|
||||
Convert a data line from GHCND-Datafile (.dly).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
line : string
|
||||
String with a data line containing the values for one month.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list of tuple
|
||||
List containing a tuple for each data value.
|
||||
"""
|
||||
if line == '':
|
||||
return []
|
||||
|
||||
countrycode = line[0:2]
|
||||
networkcode = line[2:3]
|
||||
stationid = line[0:11]
|
||||
year = int(line[11:15])
|
||||
month = int(line[15:17])
|
||||
element = line[17:21]
|
||||
datlst = []
|
||||
for i in range(0, 30):
|
||||
val = conv_data_value(line, 21 + i*8)
|
||||
if val != -9999:
|
||||
datlst.append((countrycode, networkcode, stationid,
|
||||
year, month, i+1,
|
||||
date(year, month, i+1),
|
||||
element,
|
||||
val))
|
||||
return datlst
|
||||
|
||||
|
||||
def read_dly_file(scon, spark, filename):
|
||||
"""
|
||||
Read a .dly-file into a data frame.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The spark context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
filename : string
|
||||
The name and path of the dly-File.
|
||||
|
||||
Returns
|
||||
-------
|
||||
RDD
|
||||
The RDD with the contents of the dly-File.
|
||||
"""
|
||||
dly = scon.textFile(filename)
|
||||
return process_dly_file_lines(spark, dly)
|
||||
|
||||
|
||||
def process_dly_file_lines(spark, lines):
|
||||
"""
|
||||
Process the lines of one dly file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
lines : RDD
|
||||
RDD with one value per line.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dlyFrame : DataFram
|
||||
Data Frame containing the data of the file.
|
||||
|
||||
"""
|
||||
dlsplit = lines.flatMap(conv_data_line)
|
||||
dlyfileschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('networkcode', StringType(), True),
|
||||
StructField('stationid', StringType(), True),
|
||||
StructField('year', IntegerType(), True),
|
||||
StructField('month', IntegerType(), True),
|
||||
StructField('day', IntegerType(), True),
|
||||
StructField('date', DateType(), True),
|
||||
StructField('element', StringType(), True),
|
||||
StructField('value', IntegerType(), True)
|
||||
])
|
||||
dlyframe = spark.createDataFrame(dlsplit, dlyfileschema)
|
||||
return dlyframe
|
||||
|
||||
|
||||
def import_data_rdd_parallel(scon, spark, path):
|
||||
"""
|
||||
Import the data files from ghcnd in parallel.
|
||||
|
||||
This is much faster on a cluster or a computer with many cores
|
||||
and enough main memory to hold all the raw data.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
"""
|
||||
rdd = scon.textFile(
|
||||
path+"/ghcnd_all/*.dly", minPartitions=5000)
|
||||
rddcoa = rdd.coalesce(5000)
|
||||
|
||||
rddsplit = rddcoa.flatMap(conv_data_line)
|
||||
print("Number of data records = " + str(rddsplit.count()))
|
||||
print("Number of partitions = " + str(rddsplit.getNumPartitions()))
|
||||
|
||||
dlyfileschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('networkcode', StringType(), True),
|
||||
StructField('stationid', StringType(), True),
|
||||
StructField('year', IntegerType(), True),
|
||||
StructField('month', IntegerType(), True),
|
||||
StructField('day', IntegerType(), True),
|
||||
StructField('date', DateType(), True),
|
||||
StructField('element', StringType(), True),
|
||||
StructField('value', IntegerType(), True)
|
||||
])
|
||||
dlyframe = spark.createDataFrame(rddsplit, dlyfileschema)
|
||||
|
||||
dlyframe.show(10)
|
||||
|
||||
dlyframe.write.mode('overwrite').parquet(
|
||||
GHCNDPATH + "ghcnddata.parquet")
|
||||
print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet"))
|
||||
|
||||
|
||||
def import_data_rdd_parallel_whole(scon, spark, path):
|
||||
"""
|
||||
Import the data files from ghcnd in parallel.
|
||||
|
||||
This is much faster on a cluster or a computer with many cores
|
||||
and enough main memory to hold all the raw data.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
"""
|
||||
rdd = scon.wholeTextFiles(
|
||||
path+"/ghcnd_all/*.dly", minPartitions=5000 )
|
||||
|
||||
rddvals = rdd.values()
|
||||
print("Number of files in GHCND = " + str(rddvals.count()))
|
||||
rddlen = rddvals.map(len)
|
||||
print("Number of characters in all files = " +
|
||||
str(rddlen.reduce(lambda x, y: x + y)))
|
||||
|
||||
rddlines = rddvals.flatMap(lambda x: x.split("\n"))
|
||||
print("Number of lines with data = " + str(rddlines.count()))
|
||||
|
||||
rddsplit = rddlines.flatMap(conv_data_line)
|
||||
print("Number of data records = " + str(rddsplit.count()))
|
||||
print("Number of partitions = " + str(rddsplit.getNumPartitions()))
|
||||
|
||||
dlyfileschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('networkcode', StringType(), True),
|
||||
StructField('stationid', StringType(), True),
|
||||
StructField('year', IntegerType(), True),
|
||||
StructField('month', IntegerType(), True),
|
||||
StructField('day', IntegerType(), True),
|
||||
StructField('date', DateType(), True),
|
||||
StructField('element', StringType(), True),
|
||||
StructField('value', IntegerType(), True)
|
||||
])
|
||||
dlyframe = spark.createDataFrame(rddsplit, dlyfileschema)
|
||||
|
||||
dlyframe.show(10)
|
||||
|
||||
dlyframe.write.mode('overwrite').parquet(
|
||||
GHCNDPATH + "ghcnddata.parquet")
|
||||
print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet"))
|
||||
|
||||
"""
|
||||
Code for testing problems that resulted finally from empty lines
|
||||
to solve the problem the code
|
||||
if line == '':
|
||||
return []
|
||||
was added at the beginning of convDataLine to filter away empty lines:
|
||||
|
||||
noyear = rddsplit.filter(lambda x: not x[3].isnumeric())
|
||||
noyear.collect()
|
||||
|
||||
rddlines1 = rdd.flatMap(lambda x: [(x[0], y) for y in x[1].split("\n")])
|
||||
print(rddlines1.count())
|
||||
|
||||
rddsplit1 = rddlines1.flatMap(convDataLine1)
|
||||
print(rddsplit1.count())
|
||||
|
||||
noyear1 = rddsplit1.filter(lambda x: not x[1][3].isnumeric())
|
||||
noyear1.collect()
|
||||
"""
|
||||
|
||||
|
||||
def import_ghcnd_files_extern(scon, spark, path, stationlist, batchsize,
|
||||
numparts):
|
||||
"""
|
||||
Import multiple data files in one batch.
|
||||
|
||||
Import batchsize data files in one batch and append the data into
|
||||
the parquet file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
path : string
|
||||
Path of the data files.
|
||||
stationlist : list
|
||||
List of all stations to load.
|
||||
batchsize : int
|
||||
Number of files to load in one batch.
|
||||
numparts : int
|
||||
Number of partitions to write one batch.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
data = None
|
||||
count = 0
|
||||
allcount = 0
|
||||
batchcount = 0
|
||||
for station in stationlist:
|
||||
# filename = "file://" + path + "/" + station + ".dly"
|
||||
filename = path + station + ".dly"
|
||||
if os.path.isfile(filename):
|
||||
dly = read_dly_file(spark, scon, "file://" + filename)
|
||||
if data is not None:
|
||||
data = data.union(dly)
|
||||
print("Batch " + str(batchcount) +
|
||||
" Filenr " + str(count) + " Processing " + filename)
|
||||
else:
|
||||
tstart = time()
|
||||
data = dly
|
||||
count += 1
|
||||
if count >= batchsize:
|
||||
# data = data.sort('countrycode', 'stationid', 'date')
|
||||
data = data.coalesce(numparts)
|
||||
tcoalesce = time()
|
||||
data.write.mode('Append').parquet(
|
||||
GHCNDPATH + "ghcnddata.parquet")
|
||||
anzrec = data.count()
|
||||
twrite = time()
|
||||
print(
|
||||
"\n\nBatch " + str(batchcount) +
|
||||
" #recs " + str(anzrec) +
|
||||
" #files " + str(allcount) +
|
||||
" readtime " + str.format("{:f}", tcoalesce - tstart) +
|
||||
" writetime " + str.format("{:f}", twrite - tcoalesce) +
|
||||
" recs/sec " +
|
||||
str.format("{:f}", anzrec / (twrite - tstart)) + "\n\n")
|
||||
allcount += count
|
||||
count = 0
|
||||
batchcount += 1
|
||||
data = None
|
||||
else:
|
||||
print("importGhcndFilesExtern: " + station +
|
||||
", " + filename + " not found")
|
||||
if data is not None:
|
||||
data = data.coalesce(numparts)
|
||||
data.write.mode('Append').parquet(GHCNDPATH + "ghcnddata.parquet")
|
||||
|
||||
|
||||
def import_all_data(scon, spark, path):
|
||||
"""
|
||||
Import all data from GHCND.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
path : string
|
||||
Path of data files.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
stationlist = spark.sql(
|
||||
"SELECT stationid AS station \
|
||||
FROM ghcndstations \
|
||||
ORDER BY station")
|
||||
pds = stationlist.toPandas()
|
||||
import_ghcnd_files_extern(scon, spark, path + "ghcnd_all/",
|
||||
pds.station, 30, 1)
|
||||
|
||||
|
||||
def import_data_single_files(scon, spark, stationlist, parquetname, path):
|
||||
"""
|
||||
Import the data files one by one.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
stationlist : list
|
||||
List of all stations to import data.
|
||||
parquetname : string
|
||||
Name of the parquet file to write the data to.
|
||||
path : string
|
||||
Path where the data files reside.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
pds = stationlist.toPandas()
|
||||
cnt = 0
|
||||
for station in pds.station:
|
||||
filename = path + station + ".dly"
|
||||
if os.path.isfile(filename):
|
||||
start = time()
|
||||
dly = read_dly_file(spark, scon, "file://" + filename)
|
||||
numrec = dly.count()
|
||||
dly = dly.coalesce(1).sort('element', 'date')
|
||||
read = time()
|
||||
dly.write.mode('Append').parquet(GHCNDPATH
|
||||
+ parquetname + ".parquet")
|
||||
finish = time()
|
||||
print(str.format(
|
||||
"{:8d} ", cnt) + station +
|
||||
" #rec " + str.format("{:7d}", numrec) +
|
||||
" read " + str.format("{:f}", read - start) +
|
||||
" write " + str.format("{:f}", finish - read) +
|
||||
" write/sec " + str.format("importDataSingleFiles{:f} ",
|
||||
numrec/(finish - read))
|
||||
+ " " + filename)
|
||||
else:
|
||||
print("#### " + str(cnt) + " File " +
|
||||
filename + " does not exist ####")
|
||||
cnt += 1
|
||||
|
||||
|
||||
def check_files(spark):
|
||||
"""
|
||||
Check if some files for generated stationnames do not exist.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
stationlist = spark.sql(
|
||||
"SELECT CONCAT(countrycode, networkcode, stationid) AS station \
|
||||
FROM ghcndstations \
|
||||
ORDER BY station")
|
||||
pds = stationlist.toPandas()
|
||||
count = 1
|
||||
for station in pds.station:
|
||||
filename = "/nfs/home/steger/ghcnd/ghcnd_all/" + station + ".dly"
|
||||
if os.path.isfile(filename):
|
||||
# print(str(count) + " " + station)
|
||||
pass
|
||||
else:
|
||||
print(str(count) + " File does not exist: " + filename)
|
||||
count += 1
|
||||
|
||||
"""
|
||||
Read the inventory data into a dataframe,
|
||||
register it as temporary view and write it to parquet
|
||||
"""
|
||||
|
||||
|
||||
def import_ghcnd_inventory(scon, spark, path):
|
||||
"""
|
||||
Import inventory information from GHCND.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
path : string
|
||||
Path for inventory file.
|
||||
|
||||
Returns
|
||||
-------
|
||||
invframe : DataFrame
|
||||
Data Frame with inventory data.
|
||||
|
||||
"""
|
||||
invlines = scon.textFile(path + "ghcnd-inventory.txt")
|
||||
invsplitlines = invlines.map(
|
||||
lambda l:
|
||||
(l[0:2],
|
||||
l[2:3],
|
||||
l[0:11],
|
||||
float(l[12:20].strip()),
|
||||
float(l[21:30].strip()),
|
||||
l[31:35],
|
||||
int(l[36:40]),
|
||||
int(l[41:45])
|
||||
))
|
||||
invschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('networkcode', StringType(), True),
|
||||
StructField('stationid', StringType(), True),
|
||||
StructField('latitude', FloatType(), True),
|
||||
StructField('longitude', FloatType(), True),
|
||||
StructField('element', StringType(), True),
|
||||
StructField('firstyear', IntegerType(), True),
|
||||
StructField('lastyear', IntegerType(), True)
|
||||
])
|
||||
invframe = spark.createDataFrame(invsplitlines, invschema)
|
||||
invframe.createOrReplaceTempView("ghcndinventory")
|
||||
invframe.write.mode('overwrite').parquet(
|
||||
GHCNDPATH + "ghcndinventory.parquet")
|
||||
invframe.cache()
|
||||
print("Imported GhcndInventory")
|
||||
return invframe
|
||||
|
||||
|
||||
def import_ghcnd_all(scon, spark):
|
||||
"""
|
||||
Import all files from GHCND.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
localfilepath = "file://" + GHCNDHOMEPATH
|
||||
import_ghcnd_countries(scon, spark, localfilepath)
|
||||
import_ghcnd_stations(scon, spark, localfilepath)
|
||||
import_ghcnd_inventory(scon, spark, localfilepath)
|
||||
# import_all_data(scon, spark, GHCNDHOMEPATH)
|
||||
import_data_rdd_parallel(scon, spark, localfilepath)
|
||||
|
||||
|
||||
def read_ghcnd_from_parquet(spark):
|
||||
"""
|
||||
Read all data from the parquet files into Dataframes.
|
||||
|
||||
Create temporary views from the parquet files.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
spark : SparkSession
|
||||
The SQL Session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
dfcountries = spark.read.parquet(GHCNDPATH + "ghcndcountries")
|
||||
dfcountries.createOrReplaceTempView("ghcndcountries")
|
||||
dfcountries.cache()
|
||||
|
||||
dfstations = spark.read.parquet(GHCNDPATH + "ghcndstations")
|
||||
dfstations.createOrReplaceTempView("ghcndstations")
|
||||
dfstations.cache()
|
||||
|
||||
dfinventory = spark.read.parquet(GHCNDPATH + "ghcndinventory")
|
||||
dfinventory.createOrReplaceTempView("ghcndinventory")
|
||||
dfinventory.cache()
|
||||
|
||||
dfdata = spark.read.parquet(GHCNDPATH + "ghcnddata")
|
||||
dfdata.createOrReplaceTempView("ghcnddata")
|
||||
dfdata.cache()
|
||||
|
||||
|
||||
def delete_all_parquet_ghcnd():
|
||||
"""
|
||||
Delete all parquet files that were imported from GHCND.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
delete_from_hdfs(GHCNDPATH + "ghcndstations.parquet")
|
||||
delete_from_hdfs(GHCNDPATH + "ghcndcountries.parquet")
|
||||
delete_from_hdfs(GHCNDPATH + "ghcndinventory.parquet")
|
||||
delete_from_hdfs(GHCNDPATH + "ghcnddata.parquet")
|
||||
|
||||
|
||||
def delete_from_hdfs(path):
|
||||
"""
|
||||
Delete the file in path from HDFS.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path : string
|
||||
Path of the file in HDFS.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
call("hdfs dfs -rm -R " + path,
|
||||
shell=True)
|
||||
366
Aufgabe 11/Aufgabe11.py
Normal file
366
Aufgabe 11/Aufgabe11.py
Normal file
@@ -0,0 +1,366 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from sparkstart import scon, spark
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
|
||||
HDFSPATH = "hdfs://193.174.205.250:54310/"
|
||||
|
||||
|
||||
def read_parquet_tables(spark: SparkSession) -> None:
|
||||
"""Load station master data and hourly measurements from parquet if needed."""
|
||||
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
|
||||
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
|
||||
|
||||
stations_df = spark.read.parquet(stations_path)
|
||||
stations_df.createOrReplaceTempView("german_stations")
|
||||
stations_df.cache()
|
||||
|
||||
products_df = spark.read.parquet(products_path)
|
||||
products_df.createOrReplaceTempView("german_stations_data")
|
||||
products_df.cache()
|
||||
|
||||
|
||||
def _escape_like(value: str) -> str:
|
||||
"""Escape single quotes for safe SQL literal usage."""
|
||||
return value.replace("'", "''")
|
||||
|
||||
|
||||
def resolve_station_id(spark: SparkSession, station_identifier) -> int:
|
||||
"""Resolve station id either from int input or fuzzy name search."""
|
||||
if isinstance(station_identifier, int):
|
||||
return station_identifier
|
||||
if isinstance(station_identifier, str) and station_identifier.strip().isdigit():
|
||||
return int(station_identifier.strip())
|
||||
if isinstance(station_identifier, str):
|
||||
needle = _escape_like(station_identifier.lower())
|
||||
q = (
|
||||
"SELECT stationId FROM german_stations "
|
||||
f"WHERE lower(station_name) LIKE '%{needle}%' ORDER BY station_name LIMIT 1"
|
||||
)
|
||||
result = spark.sql(q).collect()
|
||||
if not result:
|
||||
raise ValueError(f"No station found for pattern '{station_identifier}'")
|
||||
return int(result[0]["stationId"])
|
||||
raise ValueError("station_identifier must be int or str")
|
||||
|
||||
|
||||
def build_station_rollup_for_station(spark: SparkSession, station_identifier) -> None:
|
||||
"""Create rollup view with min/max/avg per hour/day/month/quarter/year."""
|
||||
station_id = resolve_station_id(spark, station_identifier)
|
||||
q = f"""
|
||||
WITH base AS (
|
||||
SELECT
|
||||
d.stationId,
|
||||
gs.station_name,
|
||||
TO_TIMESTAMP(CONCAT(d.date, LPAD(CAST(d.hour AS STRING), 2, '0')), 'yyyyMMddHH') AS hour_ts,
|
||||
TO_DATE(d.date, 'yyyyMMdd') AS day_date,
|
||||
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_in_year,
|
||||
QUARTER(TO_DATE(d.date, 'yyyyMMdd')) AS quarter_in_year,
|
||||
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value,
|
||||
d.TT_TU AS temperature
|
||||
FROM german_stations_data d
|
||||
JOIN german_stations gs ON d.stationId = gs.stationId
|
||||
WHERE d.stationId = {station_id}
|
||||
AND d.TT_TU IS NOT NULL
|
||||
AND d.TT_TU <> -999
|
||||
),
|
||||
rollup_base AS (
|
||||
SELECT
|
||||
stationId,
|
||||
station_name,
|
||||
hour_ts,
|
||||
day_date,
|
||||
month_in_year,
|
||||
quarter_in_year,
|
||||
year_value,
|
||||
MIN(temperature) AS min_temp,
|
||||
MAX(temperature) AS max_temp,
|
||||
AVG(temperature) AS avg_temp
|
||||
FROM base
|
||||
GROUP BY stationId, station_name, ROLLUP(year_value, quarter_in_year, month_in_year, day_date, hour_ts)
|
||||
)
|
||||
SELECT
|
||||
stationId,
|
||||
station_name,
|
||||
hour_ts,
|
||||
day_date,
|
||||
month_in_year,
|
||||
quarter_in_year,
|
||||
year_value,
|
||||
CASE WHEN month_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(month_in_year AS STRING), 2, '0'), '-01')) END AS month_start_date,
|
||||
CASE WHEN quarter_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(quarter_in_year * 3 - 2 AS STRING), 2, '0'), '-01')) END AS quarter_start_date,
|
||||
CASE WHEN year_value IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-01-01')) END AS year_start_date,
|
||||
min_temp,
|
||||
max_temp,
|
||||
avg_temp
|
||||
FROM rollup_base
|
||||
"""
|
||||
rollup_df = spark.sql(q)
|
||||
rollup_df.cache()
|
||||
rollup_df.createOrReplaceTempView("station_rollup")
|
||||
|
||||
|
||||
def _year_window(spark: SparkSession, years_back: int, station_id: int) -> tuple[int, int] | None:
|
||||
stats = spark.sql(
|
||||
f"SELECT MIN(year_value) AS min_year, MAX(year_value) AS max_year FROM station_rollup WHERE year_value IS NOT NULL AND stationId = {station_id}"
|
||||
).collect()
|
||||
if not stats or stats[0]["max_year"] is None:
|
||||
return None
|
||||
min_year = int(stats[0]["min_year"])
|
||||
max_year = int(stats[0]["max_year"])
|
||||
start_year = max(min_year, max_year - years_back + 1)
|
||||
return start_year, max_year
|
||||
|
||||
|
||||
def plot_station_rollup_levels(
|
||||
spark: SparkSession,
|
||||
station_identifier,
|
||||
day_span_years: int = 3,
|
||||
agg_span_years: int = 15,
|
||||
) -> None:
|
||||
"""Plot day, month, quarter, and year aggregates for the given station."""
|
||||
station_id = resolve_station_id(spark, station_identifier)
|
||||
needs_refresh = not spark.catalog.tableExists("station_rollup")
|
||||
if not needs_refresh:
|
||||
count = spark.sql(
|
||||
f"SELECT COUNT(*) AS cnt FROM station_rollup WHERE stationId = {station_id}"
|
||||
).collect()[0]["cnt"]
|
||||
needs_refresh = count == 0
|
||||
if needs_refresh:
|
||||
build_station_rollup_for_station(spark, station_id)
|
||||
|
||||
day_window = _year_window(spark, day_span_years, station_id)
|
||||
if day_window is None:
|
||||
print("No data available for plotting")
|
||||
return
|
||||
month_window = _year_window(spark, agg_span_years, station_id)
|
||||
if month_window is None:
|
||||
print("No aggregated window available")
|
||||
return
|
||||
|
||||
def _plot(query: str, figure_idx: int, title: str, x_col: str = "bucket_date") -> None:
|
||||
pdf = spark.sql(query).toPandas()
|
||||
if pdf.empty:
|
||||
print(f"No data for {title}")
|
||||
return
|
||||
plt.figure(num=figure_idx)
|
||||
plt.clf()
|
||||
metrics = [
|
||||
("min_temp", "Min", "#1f77b4"),
|
||||
("avg_temp", "Avg", "#ff7f0e"),
|
||||
("max_temp", "Max", "#2ca02c"),
|
||||
]
|
||||
for col, label, color in metrics:
|
||||
if col in pdf:
|
||||
plt.plot(pdf[x_col], pdf[col], label=label, color=color)
|
||||
plt.title(title)
|
||||
plt.xlabel("Datum")
|
||||
plt.ylabel("Temperatur (°C)")
|
||||
plt.legend()
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
day_start, day_end = day_window
|
||||
q_day = f"""
|
||||
SELECT day_date AS bucket_date, min_temp, avg_temp, max_temp
|
||||
FROM station_rollup
|
||||
WHERE stationId = {station_id}
|
||||
AND hour_ts IS NULL
|
||||
AND day_date IS NOT NULL
|
||||
AND year_value BETWEEN {day_start} AND {day_end}
|
||||
ORDER BY bucket_date
|
||||
"""
|
||||
_plot(q_day, 1, f"Tagesmittelwerte {day_start}-{day_end}")
|
||||
|
||||
agg_start, agg_end = month_window
|
||||
q_month = f"""
|
||||
SELECT month_start_date AS bucket_date, min_temp, avg_temp, max_temp
|
||||
FROM station_rollup
|
||||
WHERE stationId = {station_id}
|
||||
AND day_date IS NULL
|
||||
AND month_in_year IS NOT NULL
|
||||
AND year_value BETWEEN {agg_start} AND {agg_end}
|
||||
ORDER BY bucket_date
|
||||
"""
|
||||
_plot(q_month, 2, f"Monatsmittelwerte {agg_start}-{agg_end}")
|
||||
|
||||
q_quarter = f"""
|
||||
SELECT quarter_start_date AS bucket_date, min_temp, avg_temp, max_temp
|
||||
FROM station_rollup
|
||||
WHERE stationId = {station_id}
|
||||
AND month_in_year IS NULL
|
||||
AND quarter_in_year IS NOT NULL
|
||||
AND year_value BETWEEN {agg_start} AND {agg_end}
|
||||
ORDER BY bucket_date
|
||||
"""
|
||||
_plot(q_quarter, 3, f"Quartalsmittelwerte {agg_start}-{agg_end}")
|
||||
|
||||
q_year = f"""
|
||||
SELECT year_start_date AS bucket_date, min_temp, avg_temp, max_temp
|
||||
FROM station_rollup
|
||||
WHERE stationId = {station_id}
|
||||
AND quarter_in_year IS NULL
|
||||
AND year_value IS NOT NULL
|
||||
ORDER BY bucket_date
|
||||
"""
|
||||
_plot(q_year, 4, "Jahresmittelwerte")
|
||||
|
||||
|
||||
def create_tempmonat(spark: SparkSession) -> None:
|
||||
"""Create cached temp table tempmonat with monthly aggregates per station."""
|
||||
q = """
|
||||
SELECT
|
||||
d.stationId,
|
||||
gs.station_name,
|
||||
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value,
|
||||
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value,
|
||||
MIN(d.TT_TU) AS min_temp,
|
||||
MAX(d.TT_TU) AS max_temp,
|
||||
AVG(d.TT_TU) AS avg_temp
|
||||
FROM german_stations_data d
|
||||
JOIN german_stations gs ON d.stationId = gs.stationId
|
||||
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
|
||||
GROUP BY d.stationId, gs.station_name, YEAR(TO_DATE(d.date, 'yyyyMMdd')), MONTH(TO_DATE(d.date, 'yyyyMMdd'))
|
||||
"""
|
||||
monthly_df = spark.sql(q)
|
||||
monthly_df.cache()
|
||||
monthly_df.createOrReplaceTempView("tempmonat")
|
||||
|
||||
|
||||
def rank_coldest_per_month_2015(spark: SparkSession):
|
||||
"""Rank stations by coldest values per month for 2015 using tempmonat."""
|
||||
return spark.sql(
|
||||
"""
|
||||
SELECT
|
||||
stationId,
|
||||
station_name,
|
||||
year_value,
|
||||
month_value,
|
||||
min_temp,
|
||||
max_temp,
|
||||
avg_temp,
|
||||
RANK() OVER (PARTITION BY month_value ORDER BY min_temp ASC) AS rank_min,
|
||||
RANK() OVER (PARTITION BY month_value ORDER BY max_temp ASC) AS rank_max,
|
||||
RANK() OVER (PARTITION BY month_value ORDER BY avg_temp ASC) AS rank_avg
|
||||
FROM tempmonat
|
||||
WHERE year_value = 2015
|
||||
ORDER BY rank_min, month_value
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def rank_coldest_overall(spark: SparkSession):
|
||||
"""Rank stations by coldest values over all months/years (no partition)."""
|
||||
return spark.sql(
|
||||
"""
|
||||
SELECT
|
||||
stationId,
|
||||
station_name,
|
||||
year_value,
|
||||
month_value,
|
||||
min_temp,
|
||||
max_temp,
|
||||
avg_temp,
|
||||
RANK() OVER (ORDER BY min_temp ASC) AS rank_min,
|
||||
RANK() OVER (ORDER BY max_temp ASC) AS rank_max,
|
||||
RANK() OVER (ORDER BY avg_temp ASC) AS rank_avg
|
||||
FROM tempmonat
|
||||
ORDER BY rank_min
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def create_grouping_sets_overview(spark: SparkSession) -> None:
|
||||
"""Compute grouping sets for requested aggregations and cache the result."""
|
||||
q = """
|
||||
WITH base AS (
|
||||
SELECT
|
||||
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value,
|
||||
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value,
|
||||
gs.bundesland,
|
||||
gs.stationId,
|
||||
gs.station_name,
|
||||
d.TT_TU AS temperature
|
||||
FROM german_stations_data d
|
||||
JOIN german_stations gs ON d.stationId = gs.stationId
|
||||
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
|
||||
)
|
||||
SELECT
|
||||
year_value,
|
||||
month_value,
|
||||
bundesland,
|
||||
stationId,
|
||||
station_name,
|
||||
MIN(temperature) AS min_temp,
|
||||
MAX(temperature) AS max_temp,
|
||||
AVG(temperature) AS avg_temp
|
||||
FROM base
|
||||
GROUP BY GROUPING SETS (
|
||||
(year_value, bundesland),
|
||||
(year_value, stationId, station_name, bundesland),
|
||||
(month_value, bundesland)
|
||||
)
|
||||
"""
|
||||
grouped_df = spark.sql(q)
|
||||
grouped_df.cache()
|
||||
grouped_df.createOrReplaceTempView("grouping_sets_stats")
|
||||
|
||||
|
||||
def select_year_bundesland(spark: SparkSession):
|
||||
return spark.sql(
|
||||
"""
|
||||
SELECT year_value, bundesland, min_temp, max_temp, avg_temp
|
||||
FROM grouping_sets_stats
|
||||
WHERE bundesland IS NOT NULL AND month_value IS NULL AND stationId IS NULL
|
||||
ORDER BY year_value, bundesland
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def select_year_station(spark: SparkSession):
|
||||
return spark.sql(
|
||||
"""
|
||||
SELECT year_value, stationId, station_name, min_temp, max_temp, avg_temp
|
||||
FROM grouping_sets_stats
|
||||
WHERE stationId IS NOT NULL AND month_value IS NULL
|
||||
ORDER BY year_value, stationId
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def select_month_bundesland(spark: SparkSession):
|
||||
return spark.sql(
|
||||
"""
|
||||
SELECT month_value, bundesland, min_temp, max_temp, avg_temp
|
||||
FROM grouping_sets_stats
|
||||
WHERE month_value IS NOT NULL AND year_value IS NULL
|
||||
ORDER BY month_value, bundesland
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def main(scon, spark):
|
||||
read_parquet_tables(spark)
|
||||
build_station_rollup_for_station(spark, "kempten")
|
||||
plot_station_rollup_levels(spark, "kempten")
|
||||
|
||||
create_tempmonat(spark)
|
||||
print("Rangfolgen 2015 je Monat:")
|
||||
rank_coldest_per_month_2015(spark).show(36, truncate=False)
|
||||
print("Rangfolgen gesamt:")
|
||||
rank_coldest_overall(spark).show(36, truncate=False)
|
||||
|
||||
create_grouping_sets_overview(spark)
|
||||
print("Jahr vs Bundesland:")
|
||||
select_year_bundesland(spark).show(20, truncate=False)
|
||||
print("Jahr vs Station:")
|
||||
select_year_station(spark).show(20, truncate=False)
|
||||
print("Monat vs Bundesland:")
|
||||
select_month_bundesland(spark).show(20, truncate=False)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(scon, spark)
|
||||
22
Aufgabe 11/sparkstart.py
Normal file
22
Aufgabe 11/sparkstart.py
Normal file
@@ -0,0 +1,22 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Erzeugen einer Spark-Konfiguration
|
||||
"""
|
||||
|
||||
from pyspark import SparkConf, SparkContext
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
# connect to cluster
|
||||
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
conf.set("spark.executor.memory", '32g')
|
||||
conf.set("spark.driver.memory", '8g')
|
||||
conf.set("spark.cores.max", "40")
|
||||
scon = SparkContext(conf=conf)
|
||||
|
||||
|
||||
spark = SparkSession \
|
||||
.builder \
|
||||
.appName("Python Spark SQL") \
|
||||
.getOrCreate()
|
||||
276
Aufgabe 12/Aufgabe12.py
Normal file
276
Aufgabe 12/Aufgabe12.py
Normal file
@@ -0,0 +1,276 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Iterable, Sequence
|
||||
|
||||
from pyspark.sql import SparkSession, functions as F, types as T
|
||||
|
||||
from sparkstart import scon, spark
|
||||
|
||||
|
||||
HDFSPATH = "hdfs://193.174.205.250:54310/"
|
||||
|
||||
|
||||
_DATE_FALLBACK_EXPR = "COALESCE(date_value, TO_DATE(date_str), TO_DATE(date_str, 'yyyyMMdd'))"
|
||||
|
||||
|
||||
def _resolve_column_name(columns: Sequence[str], candidates: Iterable[str]) -> str:
|
||||
|
||||
lowered = {col.lower(): col for col in columns}
|
||||
for candidate in candidates:
|
||||
match = lowered.get(candidate.lower())
|
||||
if match:
|
||||
return match
|
||||
raise ValueError(f"None of the candidate columns {list(candidates)} exist in {columns}")
|
||||
|
||||
|
||||
def _normalize_stocks_view(spark: SparkSession) -> None:
|
||||
|
||||
stocks_path = HDFSPATH + "stocks/stocks.parquet"
|
||||
stocks_df = spark.read.parquet(stocks_path)
|
||||
|
||||
symbol_col = _resolve_column_name(stocks_df.columns, ("symbol", "ticker"))
|
||||
date_col = _resolve_column_name(stocks_df.columns, ("date", "pricedate", "dt"))
|
||||
close_col = _resolve_column_name(stocks_df.columns, ("close", "closeprice", "closingprice"))
|
||||
|
||||
stocks_df = (
|
||||
stocks_df
|
||||
.select(
|
||||
F.col(symbol_col).alias("symbol"),
|
||||
F.col(date_col).alias("raw_date"),
|
||||
F.col(close_col).alias("close_raw"),
|
||||
)
|
||||
.withColumn("date_str", F.col("raw_date").cast("string"))
|
||||
)
|
||||
|
||||
date_candidates = [
|
||||
F.col("raw_date").cast("date"),
|
||||
F.to_date("raw_date"),
|
||||
F.to_date("date_str"),
|
||||
F.to_date("date_str", "yyyyMMdd"),
|
||||
F.to_date("date_str", "MM/dd/yyyy"),
|
||||
]
|
||||
|
||||
stocks_df = (
|
||||
stocks_df
|
||||
.withColumn("date_value", F.coalesce(*date_candidates))
|
||||
.withColumn("year_value", F.substring("date_str", 1, 4).cast("int"))
|
||||
.withColumn("close_value", F.col("close_raw").cast("double"))
|
||||
.select("symbol", "date_value", "date_str", "year_value", "close_value")
|
||||
)
|
||||
|
||||
stocks_df.cache()
|
||||
stocks_df.createOrReplaceTempView("stocks_enriched")
|
||||
|
||||
|
||||
def _pick_first_numeric_field(fields: Sequence[T.StructField]) -> str:
|
||||
|
||||
numeric_types = (
|
||||
T.ByteType,
|
||||
T.ShortType,
|
||||
T.IntegerType,
|
||||
T.LongType,
|
||||
T.FloatType,
|
||||
T.DoubleType,
|
||||
T.DecimalType,
|
||||
)
|
||||
for field in fields:
|
||||
if isinstance(field.dataType, numeric_types):
|
||||
return field.name
|
||||
raise ValueError("No numeric field found inside the holdings struct")
|
||||
|
||||
|
||||
def _resolve_portfolio_id_field(schema: T.StructType) -> str:
|
||||
|
||||
priority = ("portfolio_id", "portfolioid", "id")
|
||||
lowered = {field.name.lower(): field.name for field in schema.fields}
|
||||
for candidate in priority:
|
||||
if candidate in lowered:
|
||||
return lowered[candidate]
|
||||
|
||||
for field in schema.fields:
|
||||
if not isinstance(field.dataType, (T.ArrayType, T.MapType)):
|
||||
return field.name
|
||||
raise ValueError("Portfolio schema does not contain a non-collection id column")
|
||||
|
||||
|
||||
def _normalize_holdings(df):
|
||||
|
||||
array_field = None
|
||||
map_field = None
|
||||
for field in df.schema.fields:
|
||||
if isinstance(field.dataType, T.ArrayType) and isinstance(field.dataType.elementType, T.StructType):
|
||||
array_field = field
|
||||
break
|
||||
if isinstance(field.dataType, T.MapType) and isinstance(field.dataType.keyType, T.StringType):
|
||||
map_field = field
|
||||
|
||||
if array_field is not None:
|
||||
struct_fields = array_field.dataType.elementType.fields
|
||||
symbol_field = _resolve_column_name([f.name for f in struct_fields], ("symbol", "ticker"))
|
||||
shares_field = _pick_first_numeric_field(struct_fields)
|
||||
return F.expr(
|
||||
f"transform(`{array_field.name}`, x -> named_struct('symbol', x.`{symbol_field}`, 'shares', CAST(x.`{shares_field}` AS DOUBLE)))"
|
||||
)
|
||||
|
||||
if map_field is not None and isinstance(map_field.dataType.valueType, (T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType)):
|
||||
return F.expr(
|
||||
f"transform(map_entries(`{map_field.name}`), x -> named_struct('symbol', x.key, 'shares', CAST(x.value AS DOUBLE)))"
|
||||
)
|
||||
|
||||
raise ValueError("Could not locate holdings column (array<struct> or map) in portfolio data")
|
||||
|
||||
|
||||
def _normalize_portfolio_view(spark: SparkSession) -> None:
|
||||
|
||||
portfolio_path = HDFSPATH + "stocks/portfolio.parquet"
|
||||
portfolio_df = spark.read.parquet(portfolio_path)
|
||||
|
||||
id_col = _resolve_portfolio_id_field(portfolio_df.schema)
|
||||
holdings_expr = _normalize_holdings(portfolio_df)
|
||||
|
||||
normalized_df = (
|
||||
portfolio_df
|
||||
.select(
|
||||
F.col(id_col).alias("portfolio_id"),
|
||||
holdings_expr.alias("holdings"),
|
||||
)
|
||||
)
|
||||
|
||||
normalized_df.cache()
|
||||
normalized_df.createOrReplaceTempView("portfolio")
|
||||
|
||||
spark.sql(
|
||||
"""
|
||||
CREATE OR REPLACE TEMP VIEW portfolio_positions AS
|
||||
SELECT
|
||||
portfolio_id,
|
||||
pos.symbol AS symbol,
|
||||
pos.shares AS shares
|
||||
FROM portfolio
|
||||
LATERAL VIEW explode(holdings) exploded AS pos
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def register_base_views(spark: SparkSession) -> None:
|
||||
|
||||
_normalize_stocks_view(spark)
|
||||
_normalize_portfolio_view(spark)
|
||||
|
||||
|
||||
def query_first_and_last_listing(spark: SparkSession):
|
||||
|
||||
q = f"""
|
||||
SELECT
|
||||
symbol,
|
||||
MIN({_DATE_FALLBACK_EXPR}) AS first_listing,
|
||||
MAX({_DATE_FALLBACK_EXPR}) AS last_listing
|
||||
FROM stocks_enriched
|
||||
WHERE symbol IS NOT NULL
|
||||
GROUP BY symbol
|
||||
ORDER BY symbol
|
||||
"""
|
||||
return spark.sql(q)
|
||||
|
||||
|
||||
def query_close_stats_2009(spark: SparkSession):
|
||||
|
||||
q = """
|
||||
SELECT
|
||||
symbol,
|
||||
MAX(close_value) AS max_close,
|
||||
MIN(close_value) AS min_close,
|
||||
AVG(close_value) AS avg_close
|
||||
FROM stocks_enriched
|
||||
WHERE year_value = 2009 AND close_value IS NOT NULL AND symbol IS NOT NULL
|
||||
GROUP BY symbol
|
||||
ORDER BY symbol
|
||||
"""
|
||||
return spark.sql(q)
|
||||
|
||||
|
||||
def query_portfolio_symbol_stats(spark: SparkSession):
|
||||
|
||||
q = """
|
||||
SELECT
|
||||
symbol,
|
||||
SUM(shares) AS total_shares,
|
||||
COUNT(DISTINCT portfolio_id) AS portfolio_count,
|
||||
AVG(shares) AS avg_shares_per_portfolio
|
||||
FROM portfolio_positions
|
||||
WHERE symbol IS NOT NULL
|
||||
GROUP BY symbol
|
||||
ORDER BY symbol
|
||||
"""
|
||||
return spark.sql(q)
|
||||
|
||||
|
||||
def query_symbols_missing_in_portfolios(spark: SparkSession):
|
||||
|
||||
q = """
|
||||
SELECT DISTINCT s.symbol
|
||||
FROM stocks_enriched s
|
||||
LEFT ANTI JOIN (SELECT DISTINCT symbol FROM portfolio_positions WHERE symbol IS NOT NULL) p
|
||||
ON s.symbol = p.symbol
|
||||
WHERE s.symbol IS NOT NULL
|
||||
ORDER BY s.symbol
|
||||
"""
|
||||
return spark.sql(q)
|
||||
|
||||
|
||||
def query_portfolio_values_2010(spark: SparkSession):
|
||||
|
||||
q = f"""
|
||||
WITH quotes_2010 AS (
|
||||
SELECT
|
||||
symbol,
|
||||
close_value,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY symbol
|
||||
ORDER BY {_DATE_FALLBACK_EXPR} DESC, date_str DESC
|
||||
) AS rn
|
||||
FROM stocks_enriched
|
||||
WHERE year_value = 2010 AND symbol IS NOT NULL AND close_value IS NOT NULL
|
||||
),
|
||||
last_quotes AS (
|
||||
SELECT symbol, close_value
|
||||
FROM quotes_2010
|
||||
WHERE rn = 1
|
||||
),
|
||||
portfolio_values AS (
|
||||
SELECT
|
||||
pp.portfolio_id,
|
||||
SUM(pp.shares * lq.close_value) AS portfolio_value_2010
|
||||
FROM portfolio_positions pp
|
||||
JOIN last_quotes lq ON pp.symbol = lq.symbol
|
||||
GROUP BY pp.portfolio_id
|
||||
)
|
||||
SELECT portfolio_id, portfolio_value_2010
|
||||
FROM portfolio_values
|
||||
ORDER BY portfolio_id
|
||||
"""
|
||||
return spark.sql(q)
|
||||
|
||||
|
||||
def main(scon, spark):
|
||||
|
||||
register_base_views(spark)
|
||||
|
||||
print("(a) Erste und letzte Notierung je Symbol:")
|
||||
query_first_and_last_listing(spark).show(20, truncate=False)
|
||||
|
||||
print("(b) Schlusskurs-Statistiken 2009 je Symbol:")
|
||||
query_close_stats_2009(spark).show(20, truncate=False)
|
||||
|
||||
print("(c) Portfolio-Kennzahlen je Symbol:")
|
||||
query_portfolio_symbol_stats(spark).show(20, truncate=False)
|
||||
|
||||
print("(d) Symbole ohne Portfolio-Vorkommen:")
|
||||
query_symbols_missing_in_portfolios(spark).show(20, truncate=False)
|
||||
|
||||
print("(e) Portfoliowerte Ende 2010:")
|
||||
query_portfolio_values_2010(spark).show(20, truncate=False)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(scon, spark)
|
||||
22
Aufgabe 12/sparkstart.py
Normal file
22
Aufgabe 12/sparkstart.py
Normal file
@@ -0,0 +1,22 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Erzeugen einer Spark-Konfiguration
|
||||
"""
|
||||
|
||||
from pyspark import SparkConf, SparkContext
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
# connect to cluster
|
||||
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
conf.set("spark.executor.memory", '32g')
|
||||
conf.set("spark.driver.memory", '8g')
|
||||
conf.set("spark.cores.max", "40")
|
||||
scon = SparkContext(conf=conf)
|
||||
|
||||
|
||||
spark = SparkSession \
|
||||
.builder \
|
||||
.appName("Python Spark SQL") \
|
||||
.getOrCreate()
|
||||
@@ -1,141 +1,166 @@
|
||||
#!/usr/bin/env python3
|
||||
from sparkstart import scon, spark
|
||||
from pyspark import SparkContext, rdd
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql.types import StructType
|
||||
from pyspark.sql.types import StructField
|
||||
from pyspark.sql.types import StringType
|
||||
from pyspark.sql.types import FloatType
|
||||
from pyspark.sql.types import IntegerType
|
||||
|
||||
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, FloatType
|
||||
from pyspark.sql import Row
|
||||
import pyspark.sql.functions as F
|
||||
import re
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
CDC_PATH = "/data/cdc/hourly/"
|
||||
HDFS_HOME = "hdfs://193.174.205.250:54310/"
|
||||
HDFSPATH = "hdfs://193.174.205.250:54310/"
|
||||
GHCNDPATH = HDFSPATH + "ghcnd/"
|
||||
GHCNDHOMEPATH = "/data/ghcnd/"
|
||||
|
||||
|
||||
# a) Stationsdaten einlesen & als Parquet speichern
|
||||
def a(scon, spark, path=CDC_PATH):
|
||||
stationlines = scon.textFile(path + "TU_Stundenwerte_Beschreibung_Stationen.txt")
|
||||
# Aufgabe 9 a
|
||||
|
||||
stationlines = stationlines.zipWithIndex().filter(lambda x: x[1] >= 2).map(lambda x: x[0])
|
||||
|
||||
stationsplitlines = stationlines.map(lambda l: (
|
||||
l[0:5].strip(),
|
||||
l[6:14].strip(),
|
||||
l[15:23].strip(),
|
||||
int(l[24:41].strip()),
|
||||
float(l[42:52].strip()),
|
||||
float(l[53:61].strip()),
|
||||
l[61:101].strip(),
|
||||
l[102:].strip()
|
||||
))
|
||||
def import_data(spark: SparkSession, scon: SparkContext):
|
||||
"""
|
||||
%time import_data(spark, scon)
|
||||
"""
|
||||
|
||||
stationschema = StructType([
|
||||
StructField('stationid', StringType(), True),
|
||||
StructField('from_date', StringType(), True),
|
||||
StructField('to_date', StringType(), True),
|
||||
StructField('height', IntegerType(), True),
|
||||
StructField('latitude', FloatType(), True),
|
||||
StructField('longitude', FloatType(), True),
|
||||
StructField('stationname', StringType(), True),
|
||||
StructField('state', StringType(), True)
|
||||
])
|
||||
|
||||
stationframe = spark.createDataFrame(stationsplitlines, schema=stationschema)
|
||||
|
||||
stationframe.createOrReplaceTempView("cdc_stations")
|
||||
|
||||
outfile = HDFS_HOME + "/home/kramlingermike/" + "cdc_stations.parquet"
|
||||
stationframe.write.mode('overwrite').parquet(outfile)
|
||||
stationframe.cache()
|
||||
|
||||
# a) Beispielabfrage
|
||||
def get_all_cdc_stations(spark):
|
||||
result = spark.sql(f"""
|
||||
SELECT *
|
||||
FROM cdc_stations
|
||||
ORDER BY stationname
|
||||
""")
|
||||
result.show(truncate=False)
|
||||
|
||||
# a) Beispielabfrage
|
||||
def get_cdc_stations_per_state(spark):
|
||||
result = spark.sql(f"""
|
||||
SELECT
|
||||
state,
|
||||
COUNT(*) AS count
|
||||
FROM cdc_stations
|
||||
GROUP BY state
|
||||
ORDER BY count DESC
|
||||
""")
|
||||
result.show(truncate=False)
|
||||
|
||||
def b(scon, spark):
|
||||
lines = scon.textFile(CDC_PATH + "produkt*")
|
||||
# Daten in RDD einlesen
|
||||
rdd_station = scon.textFile("/data/cdc/hourly/TU_Stundenwerte_Beschreibung_Stationen.txt")
|
||||
|
||||
lines = lines.filter(lambda line: not line.startswith("STATIONS_ID"))
|
||||
lines = lines.zipWithIndex().filter(lambda x: x[1] >= 0).map(lambda x: x[0])
|
||||
|
||||
lines = lines.map(lambda l: l.split(";"))
|
||||
|
||||
lines = lines.map(lambda s: (
|
||||
s[0].strip(),
|
||||
s[1].strip()[:8],
|
||||
int(s[1].strip()[8:]),
|
||||
int(s[2].strip()),
|
||||
float(s[3].strip()),
|
||||
float(s[4].strip())
|
||||
))
|
||||
|
||||
schema = StructType([
|
||||
StructField("stationid", StringType(), True),
|
||||
StructField("date", StringType(), True),
|
||||
StructField("hour", IntegerType(), True),
|
||||
StructField("qn_9", IntegerType(), True),
|
||||
StructField("tt_tu", FloatType(), True),
|
||||
StructField("rf_tu", FloatType(), True)
|
||||
])
|
||||
|
||||
|
||||
df = spark.createDataFrame(lines, schema)
|
||||
|
||||
df.createOrReplaceTempView("cdc_hourly")
|
||||
|
||||
outfile = HDFS_HOME + "home/kramlingermike/" + "cdc_hourly.parquet"
|
||||
df.write.mode("overwrite").parquet(outfile)
|
||||
|
||||
def get_hourly_station(spark, stationid, limit=20):
|
||||
result = spark.sql(f"""
|
||||
SELECT *
|
||||
FROM cdc_hourly
|
||||
WHERE stationid = '{stationid}'
|
||||
ORDER BY date, hour
|
||||
LIMIT {limit}
|
||||
""")
|
||||
result.show(truncate=False)
|
||||
|
||||
def avg_temp_per_day(spark, stationid, limit=20):
|
||||
result = spark.sql(f"""
|
||||
SELECT date, ROUND(AVG(tt_tu),2) AS avg_temp
|
||||
FROM cdc_hourly
|
||||
WHERE stationid = '{stationid}'
|
||||
GROUP BY date
|
||||
ORDER BY date
|
||||
LIMIT {limit}
|
||||
""")
|
||||
result.show(truncate=False)
|
||||
# Entfernen der ersten beiden Zeilen (Header und Trennzeile)
|
||||
rdd_station_filterd = (rdd_station
|
||||
.zipWithIndex() # jede Zeile bekommt idx
|
||||
.filter(lambda x: x[1] >= 2) # nur Zeilen mit idx >= 2 behalten
|
||||
.map(lambda x: x[0])) # idx wieder entfernen
|
||||
|
||||
|
||||
rdd_station_splitlines = rdd_station_filterd.map(
|
||||
lambda l: (
|
||||
int(l[:6].strip()), # Station ID
|
||||
l[6:15], # von_datum
|
||||
l[15:24], # bis_datum
|
||||
float(l[24:40].strip()), # stations höhe
|
||||
float(l[40:53].strip()), # geoBreite
|
||||
float(l[53:61].strip()), # geoHöhe
|
||||
l[61:142], # Stationsname
|
||||
l[142:-1] # Bundesland
|
||||
))
|
||||
|
||||
# Datenschema festlegen
|
||||
stationschema = StructType(
|
||||
[
|
||||
StructField("stationId", IntegerType(), True),
|
||||
StructField("von_datum", StringType(), True),
|
||||
StructField("bis_datum", StringType(), True),
|
||||
StructField("hoehe", FloatType(), True),
|
||||
StructField("geo_breite", FloatType(), True),
|
||||
StructField("geo_laenge", FloatType(), True),
|
||||
StructField("station_name", StringType(), True),
|
||||
StructField("bundesland", StringType(), True)
|
||||
]
|
||||
)
|
||||
|
||||
# Data Frame erzeugen
|
||||
stationframe = spark.createDataFrame(rdd_station_splitlines, schema=stationschema)
|
||||
stationframe.printSchema()
|
||||
|
||||
# Temporäre View erzeugen
|
||||
stationframe.createOrReplaceTempView("german_stations")
|
||||
|
||||
# Data Frame in HDFS speichern
|
||||
stationframe.write.mode("overwrite").parquet(
|
||||
HDFSPATH + "home/heiserervalentin/german_stations.parquet"
|
||||
)
|
||||
|
||||
|
||||
|
||||
def read_data_from_parquet(spark):
|
||||
"""
|
||||
read_data_from_parquet(spark)
|
||||
"""
|
||||
df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations.parquet")
|
||||
df.createOrReplaceTempView("german_stations")
|
||||
df.cache()
|
||||
|
||||
def sql_querys(spark):
|
||||
"""
|
||||
sql_querys(spark)
|
||||
"""
|
||||
spark.sql("SELECT * FROM german_stations").show(5, truncate=False)
|
||||
spark.sql("SELECT COUNT(*) AS Anzahl FROM german_stations").show()
|
||||
spark.sql("SELECT MAX(geo_breite) FROM german_stations").show()
|
||||
df = spark.sql("SELECT * FROM german_stations").toPandas()
|
||||
|
||||
plt.figure(figsize=[6,6])
|
||||
plt.scatter(df.geo_laenge, df.geo_breite, marker='.', color = 'r')
|
||||
|
||||
plt.show()
|
||||
|
||||
|
||||
def import_produkt_files(spark: SparkSession, scon: SparkContext, path='/data/cdc/hourly/'):
|
||||
"""
|
||||
import_produkt_files(spark, scon)
|
||||
"""
|
||||
|
||||
# Daten in RDD einlesen
|
||||
rdd_produkt = scon.textFile(f"{path}/produkt*")
|
||||
|
||||
# Kopfzeile und Leerzeichen filtern
|
||||
rdd_filterd = rdd_produkt \
|
||||
.filter(lambda l: l != 'STATIONS_ID;MESS_DATUM;QN_9;TT_TU;RF_TU;eor') \
|
||||
.map(lambda l: [x.strip() for x in l.split(';')])
|
||||
|
||||
# Zeilen in Felder aufteilen
|
||||
rdd_produkt_splitlines = rdd_filterd.map(
|
||||
lambda l: (
|
||||
int(l[0]), # Stat_id
|
||||
l[1][:8], # Messdatum
|
||||
int(l[1][8:10]), # Messstunde
|
||||
int(l[2]), # Qualitätsniveau
|
||||
float(l[3]), # Lufttemp.
|
||||
float(l[4]), # rel. Luftfeuchte
|
||||
int(l[1][0:4]) # jahr
|
||||
)
|
||||
)
|
||||
|
||||
print(rdd_produkt_splitlines.take(5))
|
||||
|
||||
# Datenschema definieren
|
||||
product_schema = StructType(
|
||||
[
|
||||
StructField("stationId", IntegerType(), True),
|
||||
StructField("date", StringType(), True),
|
||||
StructField("hour", IntegerType(), True),
|
||||
StructField("QN_9", IntegerType(), True),
|
||||
StructField("TT_TU", FloatType(), True),
|
||||
StructField("RF_TU", FloatType(), True),
|
||||
StructField("jahr", IntegerType(), True)
|
||||
]
|
||||
)
|
||||
|
||||
product_frame = spark.createDataFrame(rdd_produkt_splitlines, schema=product_schema)
|
||||
product_frame.printSchema()
|
||||
product_frame.createOrReplaceTempView("german_stations_data")
|
||||
|
||||
|
||||
product_frame.write.mode("overwrite").parquet(
|
||||
HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
|
||||
)
|
||||
|
||||
|
||||
def read_product_data_from_parquet(spark):
|
||||
"""
|
||||
read_product_data_from_parquet(spark)
|
||||
"""
|
||||
df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations_data.parquet")
|
||||
df.createOrReplaceTempView("german_stations_data")
|
||||
df.cache()
|
||||
|
||||
def main(scon, spark):
|
||||
"""
|
||||
main(scon, spark)
|
||||
"""
|
||||
# Daten importieren
|
||||
import_data(spark, scon)
|
||||
read_data_from_parquet(spark)
|
||||
sql_querys(spark)
|
||||
|
||||
print("a)")
|
||||
a(scon, spark)
|
||||
print("Beispielabfrage: (Alle Stationen:)")
|
||||
get_all_cdc_stations(spark)
|
||||
print("Beispielabfrage: (Alle Stationen pro Bundesland)")
|
||||
get_cdc_stations_per_state(spark)
|
||||
print("b)")
|
||||
b(scon, spark)
|
||||
print("Beispielabfrage: (Alle Daten für eine Station:)")
|
||||
get_hourly_station(spark, "4271")
|
||||
print("Beispielabfrage: (Durchschnittliche Temperatur pro Tag für eine Station:)")
|
||||
avg_temp_per_day(spark, "4271")
|
||||
import_produkt_files(spark, scon)
|
||||
read_product_data_from_parquet(spark)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(scon, spark)
|
||||
Reference in New Issue
Block a user