Compare commits

..

7 Commits

Author SHA1 Message Date
622a228fb7 12 2025-12-12 13:21:24 +01:00
d18e9823e5 add Spark configuration setup 2025-12-11 20:51:00 +01:00
2b26188647 Aufgabe11 2025-12-11 20:46:38 +01:00
f89d39d420 Aufgabe 10 2025-12-11 20:39:21 +01:00
de3782d570 fix 2025-12-04 17:42:03 +01:00
c072850289 bla 2025-11-28 14:43:37 +01:00
296d1c8978 add 9 + 10 2025-11-28 13:23:06 +01:00
7 changed files with 1012 additions and 1004 deletions

View File

@@ -1,209 +1,195 @@
from sparkstart import scon, spark
import ghcnd_stations
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
import time
import matplotlib.pyplot as plt
# a) Scatterplot: alle Stationen (lon/lat)
def plot_all_stations(spark):
q = """
SELECT stationname, latitude, longitude
FROM cdc_stations
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}")
HDFSPATH = "hdfs://193.174.205.250:54310/"
lats = [r['latitude'] for r in rows]
lons = [r['longitude'] for r in rows]
names = [r['stationname'] for r in rows]
def read_parquets(spark: SparkSession):
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
plt.figure(figsize=(8,6))
plt.scatter(lons, lats, s=10, alpha=0.6)
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('Alle CDC-Stationen (Scatter)')
plt.grid(True)
stations_df = spark.read.parquet(stations_path)
stations_df.createOrReplaceTempView("german_stations")
products_df = spark.read.parquet(products_path)
products_df.createOrReplaceTempView("german_stations_data")
stations_df.cache()
products_df.cache()
def plot_all_stations(spark: SparkSession):
q = "SELECT geo_laenge AS lon, geo_breite AS lat FROM german_stations WHERE geo_laenge IS NOT NULL AND geo_breite IS NOT NULL"
df = spark.sql(q)
pdf = df.toPandas()
plt.figure(figsize=(8, 6))
plt.scatter(pdf.lon, pdf.lat, s=6, color='red', marker='.')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('All Stations (locations)')
plt.tight_layout()
plt.show()
def duration_circle_size(spark: SparkSession):
q = (
"SELECT stationId, geo_laenge AS lon, geo_breite AS lat, "
"(CAST(SUBSTR(bis_datum,1,4) AS INT) - CAST(SUBSTR(von_datum,1,4) AS INT)) AS duration_years "
"FROM german_stations "
"WHERE TRIM(von_datum)<>'' AND TRIM(bis_datum)<>''"
)
df = spark.sql(q)
pdf = df.toPandas()
pdf['duration_years'] = pdf['duration_years'].fillna(0).astype(int)
sizes = (pdf['duration_years'].clip(lower=0) + 1) * 6
plt.figure(figsize=(8, 6))
plt.scatter(pdf.lon, pdf.lat, s=sizes, alpha=0.6, c=pdf['duration_years'], cmap='viridis')
plt.colorbar(label='Duration (years)')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('Stations with duration (years) as marker size')
plt.tight_layout()
plt.show()
def compute_daily_and_yearly_frosts(spark: SparkSession):
q_daily_max = (
"SELECT stationId, date, SUBSTR(CAST(date AS STRING),1,4) AS year, MAX(TT_TU) AS max_temp "
"FROM german_stations_data "
"WHERE TT_TU IS NOT NULL AND TT_TU > -50 AND TT_TU < 60 "
"GROUP BY stationId, date"
)
daily_max = spark.sql(q_daily_max)
daily_max.createOrReplaceTempView('daily_max')
# mark a day as frost if max_temp < 0
q_daily_frost = (
"SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost "
"FROM daily_max"
)
daily_frost = spark.sql(q_daily_frost)
daily_frost.createOrReplaceTempView('daily_frost')
# yearly frostdays per station
q_station_year = (
"SELECT stationId, year, SUM(is_frost) AS frost_days "
"FROM daily_frost GROUP BY stationId, year"
)
station_year_frost = spark.sql(q_station_year)
station_year_frost.createOrReplaceTempView('station_year_frost')
def frost_analysis(spark: SparkSession, year=2024, station_name_matches=('kempten',)):
compute_daily_and_yearly_frosts(spark)
q_hist = (
f"SELECT frost_days, COUNT(*) AS station_count "
f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days"
)
hist_df = spark.sql(q_hist)
hist_pdf = hist_df.toPandas()
if hist_pdf.empty:
print(f"No frost data found for year {year}. Trying to find available years...")
q_all = "SELECT frost_days, COUNT(*) AS station_count FROM station_year_frost GROUP BY frost_days ORDER BY frost_days"
hist_pdf = spark.sql(q_all).toPandas()
if hist_pdf.empty:
print("No frost data available at all. Check if TT_TU column contains valid temperature data.")
return
print(f"Found {len(hist_pdf)} frost day categories across all years")
plt.figure(figsize=(8, 5))
plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue')
plt.xlabel('Number of Frost Days in year ' + str(year))
plt.ylabel('Number of Stations')
plt.title(f'Stations vs Frost Days ({year})')
plt.tight_layout()
plt.show()
for name in station_name_matches:
q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'"
ids_df = spark.sql(q_find)
ids = ids_df.collect()
if not ids:
print(f"No stations found matching '{name}'")
continue
for r in ids:
sid = r['stationId']
sname = r['station_name']
print(f"Analyzing stationId={sid} name={sname}")
# b) Scatterplot: Stationsdauer in Jahren als Marker-Size
def plot_station_duration(spark, size_factor=20):
q = """
SELECT
stationname,
latitude,
longitude,
(CAST(CASE WHEN length(to_date) >= 4 THEN substr(to_date,1,4) ELSE year(current_date()) END AS INT)
- CAST(substr(from_date,1,4) AS INT)) AS years
FROM cdc_stations
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}")
q_ts = (
"SELECT year, frost_days, "
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, "
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) RANGE BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 "
f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)"
)
ts_df = spark.sql(q_ts)
lats = [r['latitude'] for r in rows]
lons = [r['longitude'] for r in rows]
years = [r['years'] if r['years'] is not None else 0 for r in rows]
sizes = [max(5, (y+1) * size_factor) for y in years]
pdf = ts_df.toPandas()
if pdf.empty:
print(f"No yearly frost data for station {sid}")
continue
plt.figure(figsize=(8,6))
plt.scatter(lons, lats, s=sizes, alpha=0.6)
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('CDC-Stationen: Dauer der Verfuegbarkeit (Größe ~ Jahre)')
plt.grid(True)
plt.show()
def plot_frost_distribution_year(spark, year):
q = f"""
WITH daily_max AS (
SELECT stationid, date, MAX(tt_tu) AS max_temp
FROM cdc_hourly
WHERE length(date) >= 8 AND substr(date,1,4) = '{year}'
GROUP BY stationid, date
),
station_frost AS (
SELECT dm.stationid, SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
FROM daily_max dm
GROUP BY dm.stationid
)
SELECT sf.frostdays, COUNT(*) AS stations
FROM station_frost sf
GROUP BY sf.frostdays
ORDER BY sf.frostdays
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Distinct frostdays: {len(rows)}")
x = [r['frostdays'] for r in rows]
y = [r['stations'] for r in rows]
plt.figure(figsize=(8,5))
plt.bar(x, y)
plt.xlabel('Anzahl Frosttage im Jahr ' + str(year))
plt.ylabel('Anzahl Stationen')
plt.title(f'Verteilung der Frosttage pro Station im Jahr {year}')
plt.grid(True)
plt.show()
pdf['year'] = pdf['year'].astype(int)
plt.figure(figsize=(10, 5))
plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o')
plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--')
plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':')
plt.xlabel('Year')
plt.ylabel('Frost Days')
plt.title(f'Frost Days over Years for {sname} (station {sid})')
plt.legend()
plt.tight_layout()
plt.show()
# c2) Frosttage Zeitreihe für eine Station mit 5- und 20-Jahres Durchschnitt (SQL window)
def plot_station_frost_timeseries(spark, station_name):
q = f"""
WITH daily_max AS (
SELECT stationid, date, MAX(tt_tu) AS max_temp
FROM cdc_hourly
GROUP BY stationid, date
),
yearly AS (
SELECT
dm.stationid,
CAST(substr(dm.date,1,4) AS INT) AS year,
SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
FROM daily_max dm
GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT)
),
station_yearly AS (
SELECT
y.year,
y.frostdays,
AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg5,
AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg20
FROM yearly y
JOIN cdc_stations s ON y.stationid = s.stationid
WHERE trim(upper(s.stationname)) = '{station_name.upper()}'
ORDER BY y.year
)
SELECT * FROM station_yearly
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years: {len(rows)}")
def height_frost_correlation(spark: SparkSession):
compute_daily_and_yearly_frosts(spark)
if not rows:
print(f"Keine Daten f\u00fcr Station '{station_name}'.")
return
q_corr = (
"SELECT syf.year AS year, corr(s.hoehe, syf.frost_days) AS height_frost_corr "
"FROM station_year_frost syf JOIN german_stations s ON syf.stationId = s.stationId "
"GROUP BY syf.year ORDER BY CAST(syf.year AS INT)"
)
corr_df = spark.sql(q_corr)
years = [r['year'] for r in rows]
frostdays = [r['frostdays'] for r in rows]
avg5 = [r['avg5'] for r in rows]
avg20 = [r['avg20'] for r in rows]
corr_pdf = corr_df.toPandas()
plt.figure(figsize=(10,5))
plt.plot(years, frostdays, label='Frosttage (Jahr)')
plt.plot(years, avg5, label='5-Jahres-Durchschnitt')
plt.plot(years, avg20, label='20-Jahres-Durchschnitt')
plt.xlabel('Jahr')
plt.ylabel('Anzahl Frosttage')
plt.title(f'Frosttage f\u00fcr Station {station_name}')
plt.legend()
plt.grid(True)
plt.show()
corr_pdf = corr_pdf.dropna(subset=['height_frost_corr'])
if corr_pdf.empty:
print("No non-NaN correlation values found.")
return
corr_pdf['year'] = corr_pdf['year'].astype(int)
plt.figure(figsize=(10, 5))
plt.bar(corr_pdf.year, corr_pdf.height_frost_corr, color='orange')
plt.xlabel('Year')
plt.ylabel('Correlation (height vs frostdays)')
plt.title('Yearly correlation: station height vs number of frost days')
plt.tight_layout()
plt.show()
# d) Korrelation Hoehe vs. Frosttage pro Jahr
def plot_height_frost_correlation(spark):
q = """
WITH daily_max AS (
SELECT stationid, date, MAX(tt_tu) AS max_temp
FROM cdc_hourly
GROUP BY stationid, date
),
yearly AS (
SELECT
dm.stationid,
CAST(substr(dm.date,1,4) AS INT) AS year,
SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
FROM daily_max dm
GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT)
),
joined AS (
SELECT y.year, s.height, y.frostdays
FROM yearly y
JOIN cdc_stations s ON y.stationid = s.stationid
),
yearly_corr AS (
SELECT year, corr(height, frostdays) AS corr
FROM joined
GROUP BY year
ORDER BY year
)
SELECT year, corr FROM yearly_corr WHERE corr IS NOT NULL
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years with corr: {len(rows)}")
def main(scon, spark):
read_parquets(spark)
if not rows:
print("Keine Korrelationsdaten verfügbar.")
return
plot_all_stations(spark)
years = [r['year'] for r in rows]
corr = [r['corr'] for r in rows]
duration_circle_size(spark)
plt.figure(figsize=(10,5))
plt.bar(years, corr)
plt.xlabel('Jahr')
plt.ylabel('Korrelationskoeffizient (height vs frostdays)')
plt.title('Korrelation Hoehe vs. Frosttage pro Jahr')
plt.grid(True)
plt.show()
frost_analysis(spark, year=2024, station_name_matches=('kempten',))
height_frost_correlation(spark)
if __name__ == '__main__':
ghcnd_stations.read_ghcnd_from_parquet(spark)
main(scon, spark)
plot_all_stations(spark)
plot_station_duration(spark)
plot_frost_distribution_year(spark, '2010')
plot_station_frost_timeseries(spark, 'KEMPTEN')
plot_height_frost_correlation(spark)
pass

View File

@@ -1,689 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Load stations, countries, inventory and data from GHCND as Dataset.
@author: steger
"""
# pylint: disable=pointless-string-statement
import os
from datetime import date
from time import time
from subprocess import call
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DateType
# =============================================
# run sparkstart.py before to create a session
# =============================================
HDFSPATH = "hdfs://193.174.205.250:54310/"
GHCNDPATH = HDFSPATH + "ghcnd/"
GHCNDHOMEPATH = "/data/ghcnd/"
def conv_elevation(elev):
"""
Convert an elevation value.
-999.9 means there is no value.
Parameters
----------
elev : string
The elevation to convert to float.
Returns
-------
res : numeric
The converted value as float.
"""
elev = elev.strip()
if elev == "-999.9":
res = None
else:
res = float(elev)
return res
def conv_data_value(line, start):
"""
Convert a single data value from a dly.- File.
Parameters
----------
line : string
The line with the data value.
start : int
The index at which the value starts.
Returns
-------
res : numeric
The onverted data value as int.
"""
return int(line[start:start+5].strip())
def import_ghcnd_stations(scon, spark, path):
"""
Read the station data into a dataframe.
Register it as temporary view and write it to parquet.
Parameters
----------
scon : SparkContext
The spark context.
spark : SparkSession
The SQL session.
Returns
-------
stationFrame : DataFrame
The spark Data Frame with the stations data.
"""
stationlines = scon.textFile(path + "ghcnd-stations.txt")
stationsplitlines = stationlines.map(
lambda l:
(l[0:2],
l[2:3],
l[0:11],
float(l[12:20].strip()),
float(l[21:30].strip()),
conv_elevation(l[31:37]),
l[41:71]
))
stationschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('latitude', FloatType(), True),
StructField('longitude', FloatType(), True),
StructField('elevation', FloatType(), True),
StructField('stationname', StringType(), True)
])
stationframe = spark.createDataFrame(stationsplitlines,
schema=stationschema)
stationframe.createOrReplaceTempView("ghcndstations")
stationframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcndstations.parquet")
stationframe.cache()
print("Imported GhcndStations")
return stationframe
def import_ghcnd_countries(scon, spark, path):
"""
Read the countries data into a dataframe.
Register it as temptable and write it to parquet.
Parameters
----------
scon : SparkContext
The spark context.
spark : SparkSession
The SQL session.
path : string
The path where the file with data resides.
Returns
-------
stationFrame : DataFrame
The spark Data Frame with the countries data.
"""
countrylines = scon.textFile(path + "ghcnd-countries.txt")
countrysplitlines = countrylines.map(lambda l: (l[0:2], l[2:50]))
countryschema = StructType([
StructField('countrycode', StringType(), True),
StructField('countryname', StringType(), True)])
countryframe = spark.createDataFrame(countrysplitlines, countryschema)
countryframe.createOrReplaceTempView("ghcndcountries")
countryframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcndcountries.parquet")
countryframe.cache()
print("Imported GhcndCountries")
return countryframe
def conv_data_line(line):
"""
Convert a data line from GHCND-Datafile (.dly).
Parameters
----------
line : string
String with a data line containing the values for one month.
Returns
-------
list of tuple
List containing a tuple for each data value.
"""
if line == '':
return []
countrycode = line[0:2]
networkcode = line[2:3]
stationid = line[0:11]
year = int(line[11:15])
month = int(line[15:17])
element = line[17:21]
datlst = []
for i in range(0, 30):
val = conv_data_value(line, 21 + i*8)
if val != -9999:
datlst.append((countrycode, networkcode, stationid,
year, month, i+1,
date(year, month, i+1),
element,
val))
return datlst
def read_dly_file(scon, spark, filename):
"""
Read a .dly-file into a data frame.
Parameters
----------
scon : SparkContext
The spark context.
spark : SparkSession
The SQL session.
filename : string
The name and path of the dly-File.
Returns
-------
RDD
The RDD with the contents of the dly-File.
"""
dly = scon.textFile(filename)
return process_dly_file_lines(spark, dly)
def process_dly_file_lines(spark, lines):
"""
Process the lines of one dly file.
Parameters
----------
spark : SparkSession
The SQL session.
lines : RDD
RDD with one value per line.
Returns
-------
dlyFrame : DataFram
Data Frame containing the data of the file.
"""
dlsplit = lines.flatMap(conv_data_line)
dlyfileschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('year', IntegerType(), True),
StructField('month', IntegerType(), True),
StructField('day', IntegerType(), True),
StructField('date', DateType(), True),
StructField('element', StringType(), True),
StructField('value', IntegerType(), True)
])
dlyframe = spark.createDataFrame(dlsplit, dlyfileschema)
return dlyframe
def import_data_rdd_parallel(scon, spark, path):
"""
Import the data files from ghcnd in parallel.
This is much faster on a cluster or a computer with many cores
and enough main memory to hold all the raw data.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
rdd = scon.textFile(
path+"/ghcnd_all/*.dly", minPartitions=5000)
rddcoa = rdd.coalesce(5000)
rddsplit = rddcoa.flatMap(conv_data_line)
print("Number of data records = " + str(rddsplit.count()))
print("Number of partitions = " + str(rddsplit.getNumPartitions()))
dlyfileschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('year', IntegerType(), True),
StructField('month', IntegerType(), True),
StructField('day', IntegerType(), True),
StructField('date', DateType(), True),
StructField('element', StringType(), True),
StructField('value', IntegerType(), True)
])
dlyframe = spark.createDataFrame(rddsplit, dlyfileschema)
dlyframe.show(10)
dlyframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcnddata.parquet")
print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet"))
def import_data_rdd_parallel_whole(scon, spark, path):
"""
Import the data files from ghcnd in parallel.
This is much faster on a cluster or a computer with many cores
and enough main memory to hold all the raw data.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
rdd = scon.wholeTextFiles(
path+"/ghcnd_all/*.dly", minPartitions=5000 )
rddvals = rdd.values()
print("Number of files in GHCND = " + str(rddvals.count()))
rddlen = rddvals.map(len)
print("Number of characters in all files = " +
str(rddlen.reduce(lambda x, y: x + y)))
rddlines = rddvals.flatMap(lambda x: x.split("\n"))
print("Number of lines with data = " + str(rddlines.count()))
rddsplit = rddlines.flatMap(conv_data_line)
print("Number of data records = " + str(rddsplit.count()))
print("Number of partitions = " + str(rddsplit.getNumPartitions()))
dlyfileschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('year', IntegerType(), True),
StructField('month', IntegerType(), True),
StructField('day', IntegerType(), True),
StructField('date', DateType(), True),
StructField('element', StringType(), True),
StructField('value', IntegerType(), True)
])
dlyframe = spark.createDataFrame(rddsplit, dlyfileschema)
dlyframe.show(10)
dlyframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcnddata.parquet")
print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet"))
"""
Code for testing problems that resulted finally from empty lines
to solve the problem the code
if line == '':
return []
was added at the beginning of convDataLine to filter away empty lines:
noyear = rddsplit.filter(lambda x: not x[3].isnumeric())
noyear.collect()
rddlines1 = rdd.flatMap(lambda x: [(x[0], y) for y in x[1].split("\n")])
print(rddlines1.count())
rddsplit1 = rddlines1.flatMap(convDataLine1)
print(rddsplit1.count())
noyear1 = rddsplit1.filter(lambda x: not x[1][3].isnumeric())
noyear1.collect()
"""
def import_ghcnd_files_extern(scon, spark, path, stationlist, batchsize,
numparts):
"""
Import multiple data files in one batch.
Import batchsize data files in one batch and append the data into
the parquet file.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
path : string
Path of the data files.
stationlist : list
List of all stations to load.
batchsize : int
Number of files to load in one batch.
numparts : int
Number of partitions to write one batch.
Returns
-------
None.
"""
data = None
count = 0
allcount = 0
batchcount = 0
for station in stationlist:
# filename = "file://" + path + "/" + station + ".dly"
filename = path + station + ".dly"
if os.path.isfile(filename):
dly = read_dly_file(spark, scon, "file://" + filename)
if data is not None:
data = data.union(dly)
print("Batch " + str(batchcount) +
" Filenr " + str(count) + " Processing " + filename)
else:
tstart = time()
data = dly
count += 1
if count >= batchsize:
# data = data.sort('countrycode', 'stationid', 'date')
data = data.coalesce(numparts)
tcoalesce = time()
data.write.mode('Append').parquet(
GHCNDPATH + "ghcnddata.parquet")
anzrec = data.count()
twrite = time()
print(
"\n\nBatch " + str(batchcount) +
" #recs " + str(anzrec) +
" #files " + str(allcount) +
" readtime " + str.format("{:f}", tcoalesce - tstart) +
" writetime " + str.format("{:f}", twrite - tcoalesce) +
" recs/sec " +
str.format("{:f}", anzrec / (twrite - tstart)) + "\n\n")
allcount += count
count = 0
batchcount += 1
data = None
else:
print("importGhcndFilesExtern: " + station +
", " + filename + " not found")
if data is not None:
data = data.coalesce(numparts)
data.write.mode('Append').parquet(GHCNDPATH + "ghcnddata.parquet")
def import_all_data(scon, spark, path):
"""
Import all data from GHCND.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
path : string
Path of data files.
Returns
-------
None.
"""
stationlist = spark.sql(
"SELECT stationid AS station \
FROM ghcndstations \
ORDER BY station")
pds = stationlist.toPandas()
import_ghcnd_files_extern(scon, spark, path + "ghcnd_all/",
pds.station, 30, 1)
def import_data_single_files(scon, spark, stationlist, parquetname, path):
"""
Import the data files one by one.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
stationlist : list
List of all stations to import data.
parquetname : string
Name of the parquet file to write the data to.
path : string
Path where the data files reside.
Returns
-------
None.
"""
pds = stationlist.toPandas()
cnt = 0
for station in pds.station:
filename = path + station + ".dly"
if os.path.isfile(filename):
start = time()
dly = read_dly_file(spark, scon, "file://" + filename)
numrec = dly.count()
dly = dly.coalesce(1).sort('element', 'date')
read = time()
dly.write.mode('Append').parquet(GHCNDPATH
+ parquetname + ".parquet")
finish = time()
print(str.format(
"{:8d} ", cnt) + station +
" #rec " + str.format("{:7d}", numrec) +
" read " + str.format("{:f}", read - start) +
" write " + str.format("{:f}", finish - read) +
" write/sec " + str.format("importDataSingleFiles{:f} ",
numrec/(finish - read))
+ " " + filename)
else:
print("#### " + str(cnt) + " File " +
filename + " does not exist ####")
cnt += 1
def check_files(spark):
"""
Check if some files for generated stationnames do not exist.
Parameters
----------
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
stationlist = spark.sql(
"SELECT CONCAT(countrycode, networkcode, stationid) AS station \
FROM ghcndstations \
ORDER BY station")
pds = stationlist.toPandas()
count = 1
for station in pds.station:
filename = "/nfs/home/steger/ghcnd/ghcnd_all/" + station + ".dly"
if os.path.isfile(filename):
# print(str(count) + " " + station)
pass
else:
print(str(count) + " File does not exist: " + filename)
count += 1
"""
Read the inventory data into a dataframe,
register it as temporary view and write it to parquet
"""
def import_ghcnd_inventory(scon, spark, path):
"""
Import inventory information from GHCND.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
path : string
Path for inventory file.
Returns
-------
invframe : DataFrame
Data Frame with inventory data.
"""
invlines = scon.textFile(path + "ghcnd-inventory.txt")
invsplitlines = invlines.map(
lambda l:
(l[0:2],
l[2:3],
l[0:11],
float(l[12:20].strip()),
float(l[21:30].strip()),
l[31:35],
int(l[36:40]),
int(l[41:45])
))
invschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('latitude', FloatType(), True),
StructField('longitude', FloatType(), True),
StructField('element', StringType(), True),
StructField('firstyear', IntegerType(), True),
StructField('lastyear', IntegerType(), True)
])
invframe = spark.createDataFrame(invsplitlines, invschema)
invframe.createOrReplaceTempView("ghcndinventory")
invframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcndinventory.parquet")
invframe.cache()
print("Imported GhcndInventory")
return invframe
def import_ghcnd_all(scon, spark):
"""
Import all files from GHCND.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
localfilepath = "file://" + GHCNDHOMEPATH
import_ghcnd_countries(scon, spark, localfilepath)
import_ghcnd_stations(scon, spark, localfilepath)
import_ghcnd_inventory(scon, spark, localfilepath)
# import_all_data(scon, spark, GHCNDHOMEPATH)
import_data_rdd_parallel(scon, spark, localfilepath)
def read_ghcnd_from_parquet(spark):
"""
Read all data from the parquet files into Dataframes.
Create temporary views from the parquet files.
Parameters
----------
spark : SparkSession
The SQL Session.
Returns
-------
None.
"""
dfcountries = spark.read.parquet(GHCNDPATH + "ghcndcountries")
dfcountries.createOrReplaceTempView("ghcndcountries")
dfcountries.cache()
dfstations = spark.read.parquet(GHCNDPATH + "ghcndstations")
dfstations.createOrReplaceTempView("ghcndstations")
dfstations.cache()
dfinventory = spark.read.parquet(GHCNDPATH + "ghcndinventory")
dfinventory.createOrReplaceTempView("ghcndinventory")
dfinventory.cache()
dfdata = spark.read.parquet(GHCNDPATH + "ghcnddata")
dfdata.createOrReplaceTempView("ghcnddata")
dfdata.cache()
def delete_all_parquet_ghcnd():
"""
Delete all parquet files that were imported from GHCND.
Returns
-------
None.
"""
delete_from_hdfs(GHCNDPATH + "ghcndstations.parquet")
delete_from_hdfs(GHCNDPATH + "ghcndcountries.parquet")
delete_from_hdfs(GHCNDPATH + "ghcndinventory.parquet")
delete_from_hdfs(GHCNDPATH + "ghcnddata.parquet")
def delete_from_hdfs(path):
"""
Delete the file in path from HDFS.
Parameters
----------
path : string
Path of the file in HDFS.
Returns
-------
None.
"""
call("hdfs dfs -rm -R " + path,
shell=True)

366
Aufgabe 11/Aufgabe11.py Normal file
View File

@@ -0,0 +1,366 @@
from __future__ import annotations
from sparkstart import scon, spark
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
HDFSPATH = "hdfs://193.174.205.250:54310/"
def read_parquet_tables(spark: SparkSession) -> None:
"""Load station master data and hourly measurements from parquet if needed."""
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
stations_df = spark.read.parquet(stations_path)
stations_df.createOrReplaceTempView("german_stations")
stations_df.cache()
products_df = spark.read.parquet(products_path)
products_df.createOrReplaceTempView("german_stations_data")
products_df.cache()
def _escape_like(value: str) -> str:
"""Escape single quotes for safe SQL literal usage."""
return value.replace("'", "''")
def resolve_station_id(spark: SparkSession, station_identifier) -> int:
"""Resolve station id either from int input or fuzzy name search."""
if isinstance(station_identifier, int):
return station_identifier
if isinstance(station_identifier, str) and station_identifier.strip().isdigit():
return int(station_identifier.strip())
if isinstance(station_identifier, str):
needle = _escape_like(station_identifier.lower())
q = (
"SELECT stationId FROM german_stations "
f"WHERE lower(station_name) LIKE '%{needle}%' ORDER BY station_name LIMIT 1"
)
result = spark.sql(q).collect()
if not result:
raise ValueError(f"No station found for pattern '{station_identifier}'")
return int(result[0]["stationId"])
raise ValueError("station_identifier must be int or str")
def build_station_rollup_for_station(spark: SparkSession, station_identifier) -> None:
"""Create rollup view with min/max/avg per hour/day/month/quarter/year."""
station_id = resolve_station_id(spark, station_identifier)
q = f"""
WITH base AS (
SELECT
d.stationId,
gs.station_name,
TO_TIMESTAMP(CONCAT(d.date, LPAD(CAST(d.hour AS STRING), 2, '0')), 'yyyyMMddHH') AS hour_ts,
TO_DATE(d.date, 'yyyyMMdd') AS day_date,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_in_year,
QUARTER(TO_DATE(d.date, 'yyyyMMdd')) AS quarter_in_year,
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value,
d.TT_TU AS temperature
FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.stationId = {station_id}
AND d.TT_TU IS NOT NULL
AND d.TT_TU <> -999
),
rollup_base AS (
SELECT
stationId,
station_name,
hour_ts,
day_date,
month_in_year,
quarter_in_year,
year_value,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
AVG(temperature) AS avg_temp
FROM base
GROUP BY stationId, station_name, ROLLUP(year_value, quarter_in_year, month_in_year, day_date, hour_ts)
)
SELECT
stationId,
station_name,
hour_ts,
day_date,
month_in_year,
quarter_in_year,
year_value,
CASE WHEN month_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(month_in_year AS STRING), 2, '0'), '-01')) END AS month_start_date,
CASE WHEN quarter_in_year IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-', LPAD(CAST(quarter_in_year * 3 - 2 AS STRING), 2, '0'), '-01')) END AS quarter_start_date,
CASE WHEN year_value IS NOT NULL THEN TO_DATE(CONCAT(CAST(year_value AS STRING), '-01-01')) END AS year_start_date,
min_temp,
max_temp,
avg_temp
FROM rollup_base
"""
rollup_df = spark.sql(q)
rollup_df.cache()
rollup_df.createOrReplaceTempView("station_rollup")
def _year_window(spark: SparkSession, years_back: int, station_id: int) -> tuple[int, int] | None:
stats = spark.sql(
f"SELECT MIN(year_value) AS min_year, MAX(year_value) AS max_year FROM station_rollup WHERE year_value IS NOT NULL AND stationId = {station_id}"
).collect()
if not stats or stats[0]["max_year"] is None:
return None
min_year = int(stats[0]["min_year"])
max_year = int(stats[0]["max_year"])
start_year = max(min_year, max_year - years_back + 1)
return start_year, max_year
def plot_station_rollup_levels(
spark: SparkSession,
station_identifier,
day_span_years: int = 3,
agg_span_years: int = 15,
) -> None:
"""Plot day, month, quarter, and year aggregates for the given station."""
station_id = resolve_station_id(spark, station_identifier)
needs_refresh = not spark.catalog.tableExists("station_rollup")
if not needs_refresh:
count = spark.sql(
f"SELECT COUNT(*) AS cnt FROM station_rollup WHERE stationId = {station_id}"
).collect()[0]["cnt"]
needs_refresh = count == 0
if needs_refresh:
build_station_rollup_for_station(spark, station_id)
day_window = _year_window(spark, day_span_years, station_id)
if day_window is None:
print("No data available for plotting")
return
month_window = _year_window(spark, agg_span_years, station_id)
if month_window is None:
print("No aggregated window available")
return
def _plot(query: str, figure_idx: int, title: str, x_col: str = "bucket_date") -> None:
pdf = spark.sql(query).toPandas()
if pdf.empty:
print(f"No data for {title}")
return
plt.figure(num=figure_idx)
plt.clf()
metrics = [
("min_temp", "Min", "#1f77b4"),
("avg_temp", "Avg", "#ff7f0e"),
("max_temp", "Max", "#2ca02c"),
]
for col, label, color in metrics:
if col in pdf:
plt.plot(pdf[x_col], pdf[col], label=label, color=color)
plt.title(title)
plt.xlabel("Datum")
plt.ylabel("Temperatur (°C)")
plt.legend()
plt.tight_layout()
plt.show()
day_start, day_end = day_window
q_day = f"""
SELECT day_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND hour_ts IS NULL
AND day_date IS NOT NULL
AND year_value BETWEEN {day_start} AND {day_end}
ORDER BY bucket_date
"""
_plot(q_day, 1, f"Tagesmittelwerte {day_start}-{day_end}")
agg_start, agg_end = month_window
q_month = f"""
SELECT month_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND day_date IS NULL
AND month_in_year IS NOT NULL
AND year_value BETWEEN {agg_start} AND {agg_end}
ORDER BY bucket_date
"""
_plot(q_month, 2, f"Monatsmittelwerte {agg_start}-{agg_end}")
q_quarter = f"""
SELECT quarter_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND month_in_year IS NULL
AND quarter_in_year IS NOT NULL
AND year_value BETWEEN {agg_start} AND {agg_end}
ORDER BY bucket_date
"""
_plot(q_quarter, 3, f"Quartalsmittelwerte {agg_start}-{agg_end}")
q_year = f"""
SELECT year_start_date AS bucket_date, min_temp, avg_temp, max_temp
FROM station_rollup
WHERE stationId = {station_id}
AND quarter_in_year IS NULL
AND year_value IS NOT NULL
ORDER BY bucket_date
"""
_plot(q_year, 4, "Jahresmittelwerte")
def create_tempmonat(spark: SparkSession) -> None:
"""Create cached temp table tempmonat with monthly aggregates per station."""
q = """
SELECT
d.stationId,
gs.station_name,
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value,
MIN(d.TT_TU) AS min_temp,
MAX(d.TT_TU) AS max_temp,
AVG(d.TT_TU) AS avg_temp
FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
GROUP BY d.stationId, gs.station_name, YEAR(TO_DATE(d.date, 'yyyyMMdd')), MONTH(TO_DATE(d.date, 'yyyyMMdd'))
"""
monthly_df = spark.sql(q)
monthly_df.cache()
monthly_df.createOrReplaceTempView("tempmonat")
def rank_coldest_per_month_2015(spark: SparkSession):
"""Rank stations by coldest values per month for 2015 using tempmonat."""
return spark.sql(
"""
SELECT
stationId,
station_name,
year_value,
month_value,
min_temp,
max_temp,
avg_temp,
RANK() OVER (PARTITION BY month_value ORDER BY min_temp ASC) AS rank_min,
RANK() OVER (PARTITION BY month_value ORDER BY max_temp ASC) AS rank_max,
RANK() OVER (PARTITION BY month_value ORDER BY avg_temp ASC) AS rank_avg
FROM tempmonat
WHERE year_value = 2015
ORDER BY rank_min, month_value
"""
)
def rank_coldest_overall(spark: SparkSession):
"""Rank stations by coldest values over all months/years (no partition)."""
return spark.sql(
"""
SELECT
stationId,
station_name,
year_value,
month_value,
min_temp,
max_temp,
avg_temp,
RANK() OVER (ORDER BY min_temp ASC) AS rank_min,
RANK() OVER (ORDER BY max_temp ASC) AS rank_max,
RANK() OVER (ORDER BY avg_temp ASC) AS rank_avg
FROM tempmonat
ORDER BY rank_min
"""
)
def create_grouping_sets_overview(spark: SparkSession) -> None:
"""Compute grouping sets for requested aggregations and cache the result."""
q = """
WITH base AS (
SELECT
YEAR(TO_DATE(d.date, 'yyyyMMdd')) AS year_value,
MONTH(TO_DATE(d.date, 'yyyyMMdd')) AS month_value,
gs.bundesland,
gs.stationId,
gs.station_name,
d.TT_TU AS temperature
FROM german_stations_data d
JOIN german_stations gs ON d.stationId = gs.stationId
WHERE d.TT_TU IS NOT NULL AND d.TT_TU <> -999
)
SELECT
year_value,
month_value,
bundesland,
stationId,
station_name,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
AVG(temperature) AS avg_temp
FROM base
GROUP BY GROUPING SETS (
(year_value, bundesland),
(year_value, stationId, station_name, bundesland),
(month_value, bundesland)
)
"""
grouped_df = spark.sql(q)
grouped_df.cache()
grouped_df.createOrReplaceTempView("grouping_sets_stats")
def select_year_bundesland(spark: SparkSession):
return spark.sql(
"""
SELECT year_value, bundesland, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE bundesland IS NOT NULL AND month_value IS NULL AND stationId IS NULL
ORDER BY year_value, bundesland
"""
)
def select_year_station(spark: SparkSession):
return spark.sql(
"""
SELECT year_value, stationId, station_name, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE stationId IS NOT NULL AND month_value IS NULL
ORDER BY year_value, stationId
"""
)
def select_month_bundesland(spark: SparkSession):
return spark.sql(
"""
SELECT month_value, bundesland, min_temp, max_temp, avg_temp
FROM grouping_sets_stats
WHERE month_value IS NOT NULL AND year_value IS NULL
ORDER BY month_value, bundesland
"""
)
def main(scon, spark):
read_parquet_tables(spark)
build_station_rollup_for_station(spark, "kempten")
plot_station_rollup_levels(spark, "kempten")
create_tempmonat(spark)
print("Rangfolgen 2015 je Monat:")
rank_coldest_per_month_2015(spark).show(36, truncate=False)
print("Rangfolgen gesamt:")
rank_coldest_overall(spark).show(36, truncate=False)
create_grouping_sets_overview(spark)
print("Jahr vs Bundesland:")
select_year_bundesland(spark).show(20, truncate=False)
print("Jahr vs Station:")
select_year_station(spark).show(20, truncate=False)
print("Monat vs Bundesland:")
select_month_bundesland(spark).show(20, truncate=False)
if __name__ == "__main__":
main(scon, spark)

22
Aufgabe 11/sparkstart.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()

276
Aufgabe 12/Aufgabe12.py Normal file
View File

@@ -0,0 +1,276 @@
from __future__ import annotations
from typing import Iterable, Sequence
from pyspark.sql import SparkSession, functions as F, types as T
from sparkstart import scon, spark
HDFSPATH = "hdfs://193.174.205.250:54310/"
_DATE_FALLBACK_EXPR = "COALESCE(date_value, TO_DATE(date_str), TO_DATE(date_str, 'yyyyMMdd'))"
def _resolve_column_name(columns: Sequence[str], candidates: Iterable[str]) -> str:
lowered = {col.lower(): col for col in columns}
for candidate in candidates:
match = lowered.get(candidate.lower())
if match:
return match
raise ValueError(f"None of the candidate columns {list(candidates)} exist in {columns}")
def _normalize_stocks_view(spark: SparkSession) -> None:
stocks_path = HDFSPATH + "stocks/stocks.parquet"
stocks_df = spark.read.parquet(stocks_path)
symbol_col = _resolve_column_name(stocks_df.columns, ("symbol", "ticker"))
date_col = _resolve_column_name(stocks_df.columns, ("date", "pricedate", "dt"))
close_col = _resolve_column_name(stocks_df.columns, ("close", "closeprice", "closingprice"))
stocks_df = (
stocks_df
.select(
F.col(symbol_col).alias("symbol"),
F.col(date_col).alias("raw_date"),
F.col(close_col).alias("close_raw"),
)
.withColumn("date_str", F.col("raw_date").cast("string"))
)
date_candidates = [
F.col("raw_date").cast("date"),
F.to_date("raw_date"),
F.to_date("date_str"),
F.to_date("date_str", "yyyyMMdd"),
F.to_date("date_str", "MM/dd/yyyy"),
]
stocks_df = (
stocks_df
.withColumn("date_value", F.coalesce(*date_candidates))
.withColumn("year_value", F.substring("date_str", 1, 4).cast("int"))
.withColumn("close_value", F.col("close_raw").cast("double"))
.select("symbol", "date_value", "date_str", "year_value", "close_value")
)
stocks_df.cache()
stocks_df.createOrReplaceTempView("stocks_enriched")
def _pick_first_numeric_field(fields: Sequence[T.StructField]) -> str:
numeric_types = (
T.ByteType,
T.ShortType,
T.IntegerType,
T.LongType,
T.FloatType,
T.DoubleType,
T.DecimalType,
)
for field in fields:
if isinstance(field.dataType, numeric_types):
return field.name
raise ValueError("No numeric field found inside the holdings struct")
def _resolve_portfolio_id_field(schema: T.StructType) -> str:
priority = ("portfolio_id", "portfolioid", "id")
lowered = {field.name.lower(): field.name for field in schema.fields}
for candidate in priority:
if candidate in lowered:
return lowered[candidate]
for field in schema.fields:
if not isinstance(field.dataType, (T.ArrayType, T.MapType)):
return field.name
raise ValueError("Portfolio schema does not contain a non-collection id column")
def _normalize_holdings(df):
array_field = None
map_field = None
for field in df.schema.fields:
if isinstance(field.dataType, T.ArrayType) and isinstance(field.dataType.elementType, T.StructType):
array_field = field
break
if isinstance(field.dataType, T.MapType) and isinstance(field.dataType.keyType, T.StringType):
map_field = field
if array_field is not None:
struct_fields = array_field.dataType.elementType.fields
symbol_field = _resolve_column_name([f.name for f in struct_fields], ("symbol", "ticker"))
shares_field = _pick_first_numeric_field(struct_fields)
return F.expr(
f"transform(`{array_field.name}`, x -> named_struct('symbol', x.`{symbol_field}`, 'shares', CAST(x.`{shares_field}` AS DOUBLE)))"
)
if map_field is not None and isinstance(map_field.dataType.valueType, (T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType)):
return F.expr(
f"transform(map_entries(`{map_field.name}`), x -> named_struct('symbol', x.key, 'shares', CAST(x.value AS DOUBLE)))"
)
raise ValueError("Could not locate holdings column (array<struct> or map) in portfolio data")
def _normalize_portfolio_view(spark: SparkSession) -> None:
portfolio_path = HDFSPATH + "stocks/portfolio.parquet"
portfolio_df = spark.read.parquet(portfolio_path)
id_col = _resolve_portfolio_id_field(portfolio_df.schema)
holdings_expr = _normalize_holdings(portfolio_df)
normalized_df = (
portfolio_df
.select(
F.col(id_col).alias("portfolio_id"),
holdings_expr.alias("holdings"),
)
)
normalized_df.cache()
normalized_df.createOrReplaceTempView("portfolio")
spark.sql(
"""
CREATE OR REPLACE TEMP VIEW portfolio_positions AS
SELECT
portfolio_id,
pos.symbol AS symbol,
pos.shares AS shares
FROM portfolio
LATERAL VIEW explode(holdings) exploded AS pos
"""
)
def register_base_views(spark: SparkSession) -> None:
_normalize_stocks_view(spark)
_normalize_portfolio_view(spark)
def query_first_and_last_listing(spark: SparkSession):
q = f"""
SELECT
symbol,
MIN({_DATE_FALLBACK_EXPR}) AS first_listing,
MAX({_DATE_FALLBACK_EXPR}) AS last_listing
FROM stocks_enriched
WHERE symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_close_stats_2009(spark: SparkSession):
q = """
SELECT
symbol,
MAX(close_value) AS max_close,
MIN(close_value) AS min_close,
AVG(close_value) AS avg_close
FROM stocks_enriched
WHERE year_value = 2009 AND close_value IS NOT NULL AND symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_portfolio_symbol_stats(spark: SparkSession):
q = """
SELECT
symbol,
SUM(shares) AS total_shares,
COUNT(DISTINCT portfolio_id) AS portfolio_count,
AVG(shares) AS avg_shares_per_portfolio
FROM portfolio_positions
WHERE symbol IS NOT NULL
GROUP BY symbol
ORDER BY symbol
"""
return spark.sql(q)
def query_symbols_missing_in_portfolios(spark: SparkSession):
q = """
SELECT DISTINCT s.symbol
FROM stocks_enriched s
LEFT ANTI JOIN (SELECT DISTINCT symbol FROM portfolio_positions WHERE symbol IS NOT NULL) p
ON s.symbol = p.symbol
WHERE s.symbol IS NOT NULL
ORDER BY s.symbol
"""
return spark.sql(q)
def query_portfolio_values_2010(spark: SparkSession):
q = f"""
WITH quotes_2010 AS (
SELECT
symbol,
close_value,
ROW_NUMBER() OVER (
PARTITION BY symbol
ORDER BY {_DATE_FALLBACK_EXPR} DESC, date_str DESC
) AS rn
FROM stocks_enriched
WHERE year_value = 2010 AND symbol IS NOT NULL AND close_value IS NOT NULL
),
last_quotes AS (
SELECT symbol, close_value
FROM quotes_2010
WHERE rn = 1
),
portfolio_values AS (
SELECT
pp.portfolio_id,
SUM(pp.shares * lq.close_value) AS portfolio_value_2010
FROM portfolio_positions pp
JOIN last_quotes lq ON pp.symbol = lq.symbol
GROUP BY pp.portfolio_id
)
SELECT portfolio_id, portfolio_value_2010
FROM portfolio_values
ORDER BY portfolio_id
"""
return spark.sql(q)
def main(scon, spark):
register_base_views(spark)
print("(a) Erste und letzte Notierung je Symbol:")
query_first_and_last_listing(spark).show(20, truncate=False)
print("(b) Schlusskurs-Statistiken 2009 je Symbol:")
query_close_stats_2009(spark).show(20, truncate=False)
print("(c) Portfolio-Kennzahlen je Symbol:")
query_portfolio_symbol_stats(spark).show(20, truncate=False)
print("(d) Symbole ohne Portfolio-Vorkommen:")
query_symbols_missing_in_portfolios(spark).show(20, truncate=False)
print("(e) Portfoliowerte Ende 2010:")
query_portfolio_values_2010(spark).show(20, truncate=False)
if __name__ == "__main__":
main(scon, spark)

22
Aufgabe 12/sparkstart.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()

View File

@@ -1,141 +1,166 @@
#!/usr/bin/env python3
from sparkstart import scon, spark
from pyspark import SparkContext, rdd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, FloatType
from pyspark.sql import Row
import pyspark.sql.functions as F
import re
import matplotlib.pyplot as plt
CDC_PATH = "/data/cdc/hourly/"
HDFS_HOME = "hdfs://193.174.205.250:54310/"
HDFSPATH = "hdfs://193.174.205.250:54310/"
GHCNDPATH = HDFSPATH + "ghcnd/"
GHCNDHOMEPATH = "/data/ghcnd/"
# a) Stationsdaten einlesen & als Parquet speichern
def a(scon, spark, path=CDC_PATH):
stationlines = scon.textFile(path + "TU_Stundenwerte_Beschreibung_Stationen.txt")
# Aufgabe 9 a
stationlines = stationlines.zipWithIndex().filter(lambda x: x[1] >= 2).map(lambda x: x[0])
stationsplitlines = stationlines.map(lambda l: (
l[0:5].strip(),
l[6:14].strip(),
l[15:23].strip(),
int(l[24:41].strip()),
float(l[42:52].strip()),
float(l[53:61].strip()),
l[61:101].strip(),
l[102:].strip()
))
def import_data(spark: SparkSession, scon: SparkContext):
"""
%time import_data(spark, scon)
"""
stationschema = StructType([
StructField('stationid', StringType(), True),
StructField('from_date', StringType(), True),
StructField('to_date', StringType(), True),
StructField('height', IntegerType(), True),
StructField('latitude', FloatType(), True),
StructField('longitude', FloatType(), True),
StructField('stationname', StringType(), True),
StructField('state', StringType(), True)
])
stationframe = spark.createDataFrame(stationsplitlines, schema=stationschema)
stationframe.createOrReplaceTempView("cdc_stations")
outfile = HDFS_HOME + "/home/kramlingermike/" + "cdc_stations.parquet"
stationframe.write.mode('overwrite').parquet(outfile)
stationframe.cache()
# a) Beispielabfrage
def get_all_cdc_stations(spark):
result = spark.sql(f"""
SELECT *
FROM cdc_stations
ORDER BY stationname
""")
result.show(truncate=False)
# a) Beispielabfrage
def get_cdc_stations_per_state(spark):
result = spark.sql(f"""
SELECT
state,
COUNT(*) AS count
FROM cdc_stations
GROUP BY state
ORDER BY count DESC
""")
result.show(truncate=False)
def b(scon, spark):
lines = scon.textFile(CDC_PATH + "produkt*")
# Daten in RDD einlesen
rdd_station = scon.textFile("/data/cdc/hourly/TU_Stundenwerte_Beschreibung_Stationen.txt")
lines = lines.filter(lambda line: not line.startswith("STATIONS_ID"))
lines = lines.zipWithIndex().filter(lambda x: x[1] >= 0).map(lambda x: x[0])
lines = lines.map(lambda l: l.split(";"))
lines = lines.map(lambda s: (
s[0].strip(),
s[1].strip()[:8],
int(s[1].strip()[8:]),
int(s[2].strip()),
float(s[3].strip()),
float(s[4].strip())
))
schema = StructType([
StructField("stationid", StringType(), True),
StructField("date", StringType(), True),
StructField("hour", IntegerType(), True),
StructField("qn_9", IntegerType(), True),
StructField("tt_tu", FloatType(), True),
StructField("rf_tu", FloatType(), True)
])
df = spark.createDataFrame(lines, schema)
df.createOrReplaceTempView("cdc_hourly")
outfile = HDFS_HOME + "home/kramlingermike/" + "cdc_hourly.parquet"
df.write.mode("overwrite").parquet(outfile)
def get_hourly_station(spark, stationid, limit=20):
result = spark.sql(f"""
SELECT *
FROM cdc_hourly
WHERE stationid = '{stationid}'
ORDER BY date, hour
LIMIT {limit}
""")
result.show(truncate=False)
def avg_temp_per_day(spark, stationid, limit=20):
result = spark.sql(f"""
SELECT date, ROUND(AVG(tt_tu),2) AS avg_temp
FROM cdc_hourly
WHERE stationid = '{stationid}'
GROUP BY date
ORDER BY date
LIMIT {limit}
""")
result.show(truncate=False)
# Entfernen der ersten beiden Zeilen (Header und Trennzeile)
rdd_station_filterd = (rdd_station
.zipWithIndex() # jede Zeile bekommt idx
.filter(lambda x: x[1] >= 2) # nur Zeilen mit idx >= 2 behalten
.map(lambda x: x[0])) # idx wieder entfernen
rdd_station_splitlines = rdd_station_filterd.map(
lambda l: (
int(l[:6].strip()), # Station ID
l[6:15], # von_datum
l[15:24], # bis_datum
float(l[24:40].strip()), # stations höhe
float(l[40:53].strip()), # geoBreite
float(l[53:61].strip()), # geoHöhe
l[61:142], # Stationsname
l[142:-1] # Bundesland
))
# Datenschema festlegen
stationschema = StructType(
[
StructField("stationId", IntegerType(), True),
StructField("von_datum", StringType(), True),
StructField("bis_datum", StringType(), True),
StructField("hoehe", FloatType(), True),
StructField("geo_breite", FloatType(), True),
StructField("geo_laenge", FloatType(), True),
StructField("station_name", StringType(), True),
StructField("bundesland", StringType(), True)
]
)
# Data Frame erzeugen
stationframe = spark.createDataFrame(rdd_station_splitlines, schema=stationschema)
stationframe.printSchema()
# Temporäre View erzeugen
stationframe.createOrReplaceTempView("german_stations")
# Data Frame in HDFS speichern
stationframe.write.mode("overwrite").parquet(
HDFSPATH + "home/heiserervalentin/german_stations.parquet"
)
def read_data_from_parquet(spark):
"""
read_data_from_parquet(spark)
"""
df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations.parquet")
df.createOrReplaceTempView("german_stations")
df.cache()
def sql_querys(spark):
"""
sql_querys(spark)
"""
spark.sql("SELECT * FROM german_stations").show(5, truncate=False)
spark.sql("SELECT COUNT(*) AS Anzahl FROM german_stations").show()
spark.sql("SELECT MAX(geo_breite) FROM german_stations").show()
df = spark.sql("SELECT * FROM german_stations").toPandas()
plt.figure(figsize=[6,6])
plt.scatter(df.geo_laenge, df.geo_breite, marker='.', color = 'r')
plt.show()
def import_produkt_files(spark: SparkSession, scon: SparkContext, path='/data/cdc/hourly/'):
"""
import_produkt_files(spark, scon)
"""
# Daten in RDD einlesen
rdd_produkt = scon.textFile(f"{path}/produkt*")
# Kopfzeile und Leerzeichen filtern
rdd_filterd = rdd_produkt \
.filter(lambda l: l != 'STATIONS_ID;MESS_DATUM;QN_9;TT_TU;RF_TU;eor') \
.map(lambda l: [x.strip() for x in l.split(';')])
# Zeilen in Felder aufteilen
rdd_produkt_splitlines = rdd_filterd.map(
lambda l: (
int(l[0]), # Stat_id
l[1][:8], # Messdatum
int(l[1][8:10]), # Messstunde
int(l[2]), # Qualitätsniveau
float(l[3]), # Lufttemp.
float(l[4]), # rel. Luftfeuchte
int(l[1][0:4]) # jahr
)
)
print(rdd_produkt_splitlines.take(5))
# Datenschema definieren
product_schema = StructType(
[
StructField("stationId", IntegerType(), True),
StructField("date", StringType(), True),
StructField("hour", IntegerType(), True),
StructField("QN_9", IntegerType(), True),
StructField("TT_TU", FloatType(), True),
StructField("RF_TU", FloatType(), True),
StructField("jahr", IntegerType(), True)
]
)
product_frame = spark.createDataFrame(rdd_produkt_splitlines, schema=product_schema)
product_frame.printSchema()
product_frame.createOrReplaceTempView("german_stations_data")
product_frame.write.mode("overwrite").parquet(
HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
)
def read_product_data_from_parquet(spark):
"""
read_product_data_from_parquet(spark)
"""
df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations_data.parquet")
df.createOrReplaceTempView("german_stations_data")
df.cache()
def main(scon, spark):
"""
main(scon, spark)
"""
# Daten importieren
import_data(spark, scon)
read_data_from_parquet(spark)
sql_querys(spark)
print("a)")
a(scon, spark)
print("Beispielabfrage: (Alle Stationen:)")
get_all_cdc_stations(spark)
print("Beispielabfrage: (Alle Stationen pro Bundesland)")
get_cdc_stations_per_state(spark)
print("b)")
b(scon, spark)
print("Beispielabfrage: (Alle Daten für eine Station:)")
get_hourly_station(spark, "4271")
print("Beispielabfrage: (Durchschnittliche Temperatur pro Tag für eine Station:)")
avg_temp_per_day(spark, "4271")
import_produkt_files(spark, scon)
read_product_data_from_parquet(spark)
if __name__ == "__main__":
main(scon, spark)