This commit is contained in:
2025-11-28 14:14:27 +01:00
parent 296d1c8978
commit c072850289
3 changed files with 315 additions and 1014 deletions

View File

@@ -1,219 +1,186 @@
from sparkstart import scon, spark
import ghcnd_stations
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
import time
import matplotlib.pyplot as plt
# a) Scatterplot: alle Stationen (lon/lat)
def plot_all_stations(spark):
q = """
SELECT stationname, latitude, longitude
FROM ghcndstations
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}")
lats = [r['latitude'] for r in rows]
lons = [r['longitude'] for r in rows]
names = [r['stationname'] for r in rows]
plt.figure(figsize=(8,6))
plt.scatter(lons, lats, s=10, alpha=0.6)
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('Alle GHCND-Stationen (Scatter)')
plt.grid(True)
plt.show()
HDFSPATH = "hdfs://193.174.205.250:54310/"
# b) Scatterplot: Stationsdauer in Jahren als Marker-Size (aus ghcndinventory: firstyear/lastyear)
def plot_station_duration(spark, size_factor=20):
q = """
SELECT
s.stationname,
s.latitude,
s.longitude,
(COALESCE(i.lastyear, year(current_date())) - COALESCE(i.firstyear, year(current_date()))) AS years
FROM ghcndstations s
LEFT JOIN ghcndinventory i ON s.stationid = i.stationid
WHERE s.latitude IS NOT NULL AND s.longitude IS NOT NULL
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}")
def read_parquets(spark: SparkSession):
stations_path = HDFSPATH + "home/heiserervalentin/german_stations.parquet"
products_path = HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
lats = [r['latitude'] for r in rows]
lons = [r['longitude'] for r in rows]
years = [r['years'] if r['years'] is not None else 0 for r in rows]
sizes = [max(5, (y+1) * size_factor) for y in years]
stations_df = spark.read.parquet(stations_path)
stations_df.createOrReplaceTempView("german_stations")
plt.figure(figsize=(8,6))
plt.scatter(lons, lats, s=sizes, alpha=0.6)
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('GHCND-Stationen: Dauer der Verfuegbarkeit (Größe ~ Jahre)')
plt.grid(True)
plt.show()
products_df = spark.read.parquet(products_path)
products_df.createOrReplaceTempView("german_stations_data")
stations_df.cache()
products_df.cache()
def plot_frost_distribution_year(spark, year):
q = f"""
WITH daily_max AS (
SELECT stationid, date, MAX(CAST(value AS DOUBLE))/10.0 AS max_temp
FROM ghcnddata
WHERE element = 'TMAX'
AND length(date) >= 4
AND substr(date,1,4) = '{year}'
GROUP BY stationid, date
),
station_frost AS (
SELECT dm.stationid, SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
FROM daily_max dm
GROUP BY dm.stationid
)
SELECT sf.frostdays, COUNT(*) AS stations
FROM station_frost sf
GROUP BY sf.frostdays
ORDER BY sf.frostdays
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Distinct frostdays: {len(rows)}")
def plot_all_stations(spark: SparkSession):
q = "SELECT geo_laenge AS lon, geo_breite AS lat FROM german_stations WHERE geo_laenge IS NOT NULL AND geo_breite IS NOT NULL"
df = spark.sql(q)
if not rows:
print(f"Keine Daten f\u00fcr Jahr {year}.")
return
x = [r['frostdays'] for r in rows]
y = [r['stations'] for r in rows]
plt.figure(figsize=(8,5))
plt.bar(x, y)
plt.xlabel('Anzahl Frosttage im Jahr ' + str(year))
plt.ylabel('Anzahl Stationen')
plt.title(f'Verteilung der Frosttage pro Station im Jahr {year}')
plt.grid(True)
plt.show()
pdf = df.toPandas()
plt.figure(figsize=(8, 6))
plt.scatter(pdf.lon, pdf.lat, s=6, color='red', marker='.')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('All Stations (locations)')
plt.tight_layout()
plt.show()
# c2) Frosttage Zeitreihe für eine Station mit 5- und 20-Jahres Durchschnitt (SQL window)
def plot_station_frost_timeseries(spark, station_name):
q = f"""
WITH daily_max AS (
SELECT stationid, date, MAX(CAST(value AS DOUBLE))/10.0 AS max_temp
FROM ghcnddata
WHERE element = 'TMAX'
GROUP BY stationid, date
),
yearly AS (
SELECT
dm.stationid,
CAST(substr(dm.date,1,4) AS INT) AS year,
SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
FROM daily_max dm
GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT)
),
station_yearly AS (
SELECT
y.year,
y.frostdays,
AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg5,
AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg20
FROM yearly y
JOIN ghcndstations s ON y.stationid = s.stationid
WHERE trim(upper(s.stationname)) = '{station_name.upper()}'
ORDER BY y.year
)
SELECT * FROM station_yearly
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years: {len(rows)}")
def duration_circle_size(spark: SparkSession):
q = (
"SELECT stationId, geo_laenge AS lon, geo_breite AS lat, "
"(CAST(SUBSTR(bis_datum,1,4) AS INT) - CAST(SUBSTR(von_datum,1,4) AS INT)) AS duration_years "
"FROM german_stations "
"WHERE TRIM(von_datum)<>'' AND TRIM(bis_datum)<>''"
)
df = spark.sql(q)
if not rows:
print(f"Keine Daten f\u00fcr Station '{station_name}'.")
return
pdf = df.toPandas()
years = [r['year'] for r in rows]
frostdays = [r['frostdays'] for r in rows]
avg5 = [r['avg5'] for r in rows]
avg20 = [r['avg20'] for r in rows]
pdf['duration_years'] = pdf['duration_years'].fillna(0).astype(int)
sizes = (pdf['duration_years'].clip(lower=0) + 1) * 6
plt.figure(figsize=(10,5))
plt.plot(years, frostdays, label='Frosttage (Jahr)')
plt.plot(years, avg5, label='5-Jahres-Durchschnitt')
plt.plot(years, avg20, label='20-Jahres-Durchschnitt')
plt.xlabel('Jahr')
plt.ylabel('Anzahl Frosttage')
plt.title(f'Frosttage f\u00fcr Station {station_name}')
plt.legend()
plt.grid(True)
plt.show()
plt.figure(figsize=(8, 6))
plt.scatter(pdf.lon, pdf.lat, s=sizes, alpha=0.6, c=pdf['duration_years'], cmap='viridis')
plt.colorbar(label='Duration (years)')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('Stations with duration (years) as marker size')
plt.tight_layout()
plt.show()
# d) Korrelation Hoehe (elevation) vs. Frosttage pro Jahr
def plot_height_frost_correlation(spark):
q = """
WITH daily_max AS (
SELECT stationid, date, MAX(CAST(value AS DOUBLE))/10.0 AS max_temp
FROM ghcnddata
WHERE element = 'TMAX'
GROUP BY stationid, date
),
yearly AS (
SELECT
dm.stationid,
CAST(substr(dm.date,1,4) AS INT) AS year,
SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
FROM daily_max dm
GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT)
),
joined AS (
SELECT y.year, s.elevation, y.frostdays
FROM yearly y
JOIN ghcndstations s ON y.stationid = s.stationid
WHERE s.elevation IS NOT NULL
),
yearly_corr AS (
SELECT year, corr(elevation, frostdays) AS corr
FROM joined
GROUP BY year
ORDER BY year
)
SELECT year, corr FROM yearly_corr WHERE corr IS NOT NULL
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years with corr: {len(rows)}")
def compute_daily_and_yearly_frosts(spark: SparkSession):
q_daily_max = (
"SELECT stationId, date, SUBSTR(date,1,4) AS year, MAX(TT_TU) AS max_temp "
"FROM german_stations_data "
"GROUP BY stationId, date"
)
daily_max = spark.sql(q_daily_max)
daily_max.createOrReplaceTempView('daily_max')
if not rows:
print("Keine Korrelationsdaten verfügbar.")
return
# mark a day as frost if max_temp < 0
q_daily_frost = (
"SELECT stationId, year, CASE WHEN max_temp < 0 THEN 1 ELSE 0 END AS is_frost "
"FROM daily_max"
)
daily_frost = spark.sql(q_daily_frost)
daily_frost.createOrReplaceTempView('daily_frost')
years = [r['year'] for r in rows]
corr = [r['corr'] for r in rows]
# yearly frostdays per station
q_station_year = (
"SELECT stationId, year, SUM(is_frost) AS frost_days "
"FROM daily_frost GROUP BY stationId, year"
)
station_year_frost = spark.sql(q_station_year)
station_year_frost.createOrReplaceTempView('station_year_frost')
plt.figure(figsize=(10,5))
plt.bar(years, corr)
plt.xlabel('Jahr')
plt.ylabel('Korrelationskoeffizient (elevation vs frostdays)')
plt.title('Korrelation Hoehe (elevation) vs. Frosttage pro Jahr')
plt.grid(True)
plt.show()
def frost_analysis(spark: SparkSession, year=2024, station_name_matches=('kempten',)):
compute_daily_and_yearly_frosts(spark)
q_hist = (
f"SELECT frost_days, COUNT(*) AS station_count "
f"FROM station_year_frost WHERE year = '{year}' GROUP BY frost_days ORDER BY frost_days"
)
hist_df = spark.sql(q_hist)
hist_pdf = hist_df.toPandas()
plt.figure(figsize=(8, 5))
plt.bar(hist_pdf.frost_days, hist_pdf.station_count, color='steelblue')
plt.xlabel('Number of Frost Days in year ' + str(year))
plt.ylabel('Number of Stations')
plt.title(f'Stations vs Frost Days ({year})')
plt.tight_layout()
plt.show()
for name in station_name_matches:
q_find = f"SELECT stationId, station_name FROM german_stations WHERE lower(station_name) LIKE '%{name.lower()}%'"
ids_df = spark.sql(q_find)
ids = ids_df.collect()
if not ids:
print(f"No stations found matching '{name}'")
continue
for r in ids:
sid = r['stationId']
sname = r['station_name']
print(f"Analyzing stationId={sid} name={sname}")
# compute frostdays + 5-yr and 20-yr rolling averages using window frame
q_ts = (
"SELECT year, frost_days, "
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_5, "
"AVG(frost_days) OVER (PARTITION BY stationId ORDER BY CAST(year AS INT) ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_20 "
f"FROM station_year_frost WHERE stationId = {sid} ORDER BY CAST(year AS INT)"
)
ts_df = spark.sql(q_ts)
pdf = ts_df.toPandas()
if pdf.empty:
print(f"No yearly frost data for station {sid}")
continue
pdf['year'] = pdf['year'].astype(int)
plt.figure(figsize=(10, 5))
plt.plot(pdf.year, pdf.frost_days, label='Frostdays (year)', marker='o')
plt.plot(pdf.year, pdf.avg_5, label='5-year avg', linestyle='--')
plt.plot(pdf.year, pdf.avg_20, label='20-year avg', linestyle=':')
plt.xlabel('Year')
plt.ylabel('Frost Days')
plt.title(f'Frost Days over Years for {sname} (station {sid})')
plt.legend()
plt.tight_layout()
plt.show()
def height_frost_correlation(spark: SparkSession):
compute_daily_and_yearly_frosts(spark)
q_corr = (
"SELECT syf.year AS year, corr(s.hoehe, syf.frost_days) AS height_frost_corr "
"FROM station_year_frost syf JOIN german_stations s ON syf.stationId = s.stationId "
"GROUP BY syf.year ORDER BY CAST(syf.year AS INT)"
)
corr_df = spark.sql(q_corr)
corr_pdf = corr_df.toPandas()
corr_pdf = corr_pdf.dropna(subset=['height_frost_corr'])
if corr_pdf.empty:
print("No non-NaN correlation values found.")
return
corr_pdf['year'] = corr_pdf['year'].astype(int)
plt.figure(figsize=(10, 5))
plt.bar(corr_pdf.year, corr_pdf.height_frost_corr, color='orange')
plt.xlabel('Year')
plt.ylabel('Correlation (height vs frostdays)')
plt.title('Yearly correlation: station height vs number of frost days')
plt.tight_layout()
plt.show()
def main(scon, spark):
read_parquets(spark)
plot_all_stations(spark)
duration_circle_size(spark)
frost_analysis(spark, year=2024, station_name_matches=('kempten',))
height_frost_correlation(spark)
if __name__ == '__main__':
ghcnd_stations.read_ghcnd_from_parquet(spark)
main(scon, spark)
plot_all_stations(spark)
plot_station_duration(spark)
plot_frost_distribution_year(spark, '2010')
plot_station_frost_timeseries(spark, 'KEMPTEN')
plot_height_frost_correlation(spark)
pass

View File

@@ -1,689 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Load stations, countries, inventory and data from GHCND as Dataset.
@author: steger
"""
# pylint: disable=pointless-string-statement
import os
from datetime import date
from time import time
from subprocess import call
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DateType
# =============================================
# run sparkstart.py before to create a session
# =============================================
HDFSPATH = "hdfs://193.174.205.250:54310/"
GHCNDPATH = HDFSPATH + "ghcnd/"
GHCNDHOMEPATH = "/data/ghcnd/"
def conv_elevation(elev):
"""
Convert an elevation value.
-999.9 means there is no value.
Parameters
----------
elev : string
The elevation to convert to float.
Returns
-------
res : numeric
The converted value as float.
"""
elev = elev.strip()
if elev == "-999.9":
res = None
else:
res = float(elev)
return res
def conv_data_value(line, start):
"""
Convert a single data value from a dly.- File.
Parameters
----------
line : string
The line with the data value.
start : int
The index at which the value starts.
Returns
-------
res : numeric
The onverted data value as int.
"""
return int(line[start:start+5].strip())
def import_ghcnd_stations(scon, spark, path):
"""
Read the station data into a dataframe.
Register it as temporary view and write it to parquet.
Parameters
----------
scon : SparkContext
The spark context.
spark : SparkSession
The SQL session.
Returns
-------
stationFrame : DataFrame
The spark Data Frame with the stations data.
"""
stationlines = scon.textFile(path + "ghcnd-stations.txt")
stationsplitlines = stationlines.map(
lambda l:
(l[0:2],
l[2:3],
l[0:11],
float(l[12:20].strip()),
float(l[21:30].strip()),
conv_elevation(l[31:37]),
l[41:71]
))
stationschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('latitude', FloatType(), True),
StructField('longitude', FloatType(), True),
StructField('elevation', FloatType(), True),
StructField('stationname', StringType(), True)
])
stationframe = spark.createDataFrame(stationsplitlines,
schema=stationschema)
stationframe.createOrReplaceTempView("ghcndstations")
stationframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcndstations.parquet")
stationframe.cache()
print("Imported GhcndStations")
return stationframe
def import_ghcnd_countries(scon, spark, path):
"""
Read the countries data into a dataframe.
Register it as temptable and write it to parquet.
Parameters
----------
scon : SparkContext
The spark context.
spark : SparkSession
The SQL session.
path : string
The path where the file with data resides.
Returns
-------
stationFrame : DataFrame
The spark Data Frame with the countries data.
"""
countrylines = scon.textFile(path + "ghcnd-countries.txt")
countrysplitlines = countrylines.map(lambda l: (l[0:2], l[2:50]))
countryschema = StructType([
StructField('countrycode', StringType(), True),
StructField('countryname', StringType(), True)])
countryframe = spark.createDataFrame(countrysplitlines, countryschema)
countryframe.createOrReplaceTempView("ghcndcountries")
countryframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcndcountries.parquet")
countryframe.cache()
print("Imported GhcndCountries")
return countryframe
def conv_data_line(line):
"""
Convert a data line from GHCND-Datafile (.dly).
Parameters
----------
line : string
String with a data line containing the values for one month.
Returns
-------
list of tuple
List containing a tuple for each data value.
"""
if line == '':
return []
countrycode = line[0:2]
networkcode = line[2:3]
stationid = line[0:11]
year = int(line[11:15])
month = int(line[15:17])
element = line[17:21]
datlst = []
for i in range(0, 30):
val = conv_data_value(line, 21 + i*8)
if val != -9999:
datlst.append((countrycode, networkcode, stationid,
year, month, i+1,
date(year, month, i+1),
element,
val))
return datlst
def read_dly_file(scon, spark, filename):
"""
Read a .dly-file into a data frame.
Parameters
----------
scon : SparkContext
The spark context.
spark : SparkSession
The SQL session.
filename : string
The name and path of the dly-File.
Returns
-------
RDD
The RDD with the contents of the dly-File.
"""
dly = scon.textFile(filename)
return process_dly_file_lines(spark, dly)
def process_dly_file_lines(spark, lines):
"""
Process the lines of one dly file.
Parameters
----------
spark : SparkSession
The SQL session.
lines : RDD
RDD with one value per line.
Returns
-------
dlyFrame : DataFram
Data Frame containing the data of the file.
"""
dlsplit = lines.flatMap(conv_data_line)
dlyfileschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('year', IntegerType(), True),
StructField('month', IntegerType(), True),
StructField('day', IntegerType(), True),
StructField('date', DateType(), True),
StructField('element', StringType(), True),
StructField('value', IntegerType(), True)
])
dlyframe = spark.createDataFrame(dlsplit, dlyfileschema)
return dlyframe
def import_data_rdd_parallel(scon, spark, path):
"""
Import the data files from ghcnd in parallel.
This is much faster on a cluster or a computer with many cores
and enough main memory to hold all the raw data.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
rdd = scon.textFile(
path+"/ghcnd_all/*.dly", minPartitions=5000)
rddcoa = rdd.coalesce(5000)
rddsplit = rddcoa.flatMap(conv_data_line)
print("Number of data records = " + str(rddsplit.count()))
print("Number of partitions = " + str(rddsplit.getNumPartitions()))
dlyfileschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('year', IntegerType(), True),
StructField('month', IntegerType(), True),
StructField('day', IntegerType(), True),
StructField('date', DateType(), True),
StructField('element', StringType(), True),
StructField('value', IntegerType(), True)
])
dlyframe = spark.createDataFrame(rddsplit, dlyfileschema)
dlyframe.show(10)
dlyframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcnddata.parquet")
print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet"))
def import_data_rdd_parallel_whole(scon, spark, path):
"""
Import the data files from ghcnd in parallel.
This is much faster on a cluster or a computer with many cores
and enough main memory to hold all the raw data.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
rdd = scon.wholeTextFiles(
path+"/ghcnd_all/*.dly", minPartitions=5000 )
rddvals = rdd.values()
print("Number of files in GHCND = " + str(rddvals.count()))
rddlen = rddvals.map(len)
print("Number of characters in all files = " +
str(rddlen.reduce(lambda x, y: x + y)))
rddlines = rddvals.flatMap(lambda x: x.split("\n"))
print("Number of lines with data = " + str(rddlines.count()))
rddsplit = rddlines.flatMap(conv_data_line)
print("Number of data records = " + str(rddsplit.count()))
print("Number of partitions = " + str(rddsplit.getNumPartitions()))
dlyfileschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('year', IntegerType(), True),
StructField('month', IntegerType(), True),
StructField('day', IntegerType(), True),
StructField('date', DateType(), True),
StructField('element', StringType(), True),
StructField('value', IntegerType(), True)
])
dlyframe = spark.createDataFrame(rddsplit, dlyfileschema)
dlyframe.show(10)
dlyframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcnddata.parquet")
print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet"))
"""
Code for testing problems that resulted finally from empty lines
to solve the problem the code
if line == '':
return []
was added at the beginning of convDataLine to filter away empty lines:
noyear = rddsplit.filter(lambda x: not x[3].isnumeric())
noyear.collect()
rddlines1 = rdd.flatMap(lambda x: [(x[0], y) for y in x[1].split("\n")])
print(rddlines1.count())
rddsplit1 = rddlines1.flatMap(convDataLine1)
print(rddsplit1.count())
noyear1 = rddsplit1.filter(lambda x: not x[1][3].isnumeric())
noyear1.collect()
"""
def import_ghcnd_files_extern(scon, spark, path, stationlist, batchsize,
numparts):
"""
Import multiple data files in one batch.
Import batchsize data files in one batch and append the data into
the parquet file.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
path : string
Path of the data files.
stationlist : list
List of all stations to load.
batchsize : int
Number of files to load in one batch.
numparts : int
Number of partitions to write one batch.
Returns
-------
None.
"""
data = None
count = 0
allcount = 0
batchcount = 0
for station in stationlist:
# filename = "file://" + path + "/" + station + ".dly"
filename = path + station + ".dly"
if os.path.isfile(filename):
dly = read_dly_file(spark, scon, "file://" + filename)
if data is not None:
data = data.union(dly)
print("Batch " + str(batchcount) +
" Filenr " + str(count) + " Processing " + filename)
else:
tstart = time()
data = dly
count += 1
if count >= batchsize:
# data = data.sort('countrycode', 'stationid', 'date')
data = data.coalesce(numparts)
tcoalesce = time()
data.write.mode('Append').parquet(
GHCNDPATH + "ghcnddata.parquet")
anzrec = data.count()
twrite = time()
print(
"\n\nBatch " + str(batchcount) +
" #recs " + str(anzrec) +
" #files " + str(allcount) +
" readtime " + str.format("{:f}", tcoalesce - tstart) +
" writetime " + str.format("{:f}", twrite - tcoalesce) +
" recs/sec " +
str.format("{:f}", anzrec / (twrite - tstart)) + "\n\n")
allcount += count
count = 0
batchcount += 1
data = None
else:
print("importGhcndFilesExtern: " + station +
", " + filename + " not found")
if data is not None:
data = data.coalesce(numparts)
data.write.mode('Append').parquet(GHCNDPATH + "ghcnddata.parquet")
def import_all_data(scon, spark, path):
"""
Import all data from GHCND.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
path : string
Path of data files.
Returns
-------
None.
"""
stationlist = spark.sql(
"SELECT stationid AS station \
FROM ghcndstations \
ORDER BY station")
pds = stationlist.toPandas()
import_ghcnd_files_extern(scon, spark, path + "ghcnd_all/",
pds.station, 30, 1)
def import_data_single_files(scon, spark, stationlist, parquetname, path):
"""
Import the data files one by one.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
stationlist : list
List of all stations to import data.
parquetname : string
Name of the parquet file to write the data to.
path : string
Path where the data files reside.
Returns
-------
None.
"""
pds = stationlist.toPandas()
cnt = 0
for station in pds.station:
filename = path + station + ".dly"
if os.path.isfile(filename):
start = time()
dly = read_dly_file(spark, scon, "file://" + filename)
numrec = dly.count()
dly = dly.coalesce(1).sort('element', 'date')
read = time()
dly.write.mode('Append').parquet(GHCNDPATH
+ parquetname + ".parquet")
finish = time()
print(str.format(
"{:8d} ", cnt) + station +
" #rec " + str.format("{:7d}", numrec) +
" read " + str.format("{:f}", read - start) +
" write " + str.format("{:f}", finish - read) +
" write/sec " + str.format("importDataSingleFiles{:f} ",
numrec/(finish - read))
+ " " + filename)
else:
print("#### " + str(cnt) + " File " +
filename + " does not exist ####")
cnt += 1
def check_files(spark):
"""
Check if some files for generated stationnames do not exist.
Parameters
----------
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
stationlist = spark.sql(
"SELECT CONCAT(countrycode, networkcode, stationid) AS station \
FROM ghcndstations \
ORDER BY station")
pds = stationlist.toPandas()
count = 1
for station in pds.station:
filename = "/nfs/home/steger/ghcnd/ghcnd_all/" + station + ".dly"
if os.path.isfile(filename):
# print(str(count) + " " + station)
pass
else:
print(str(count) + " File does not exist: " + filename)
count += 1
"""
Read the inventory data into a dataframe,
register it as temporary view and write it to parquet
"""
def import_ghcnd_inventory(scon, spark, path):
"""
Import inventory information from GHCND.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
path : string
Path for inventory file.
Returns
-------
invframe : DataFrame
Data Frame with inventory data.
"""
invlines = scon.textFile(path + "ghcnd-inventory.txt")
invsplitlines = invlines.map(
lambda l:
(l[0:2],
l[2:3],
l[0:11],
float(l[12:20].strip()),
float(l[21:30].strip()),
l[31:35],
int(l[36:40]),
int(l[41:45])
))
invschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('latitude', FloatType(), True),
StructField('longitude', FloatType(), True),
StructField('element', StringType(), True),
StructField('firstyear', IntegerType(), True),
StructField('lastyear', IntegerType(), True)
])
invframe = spark.createDataFrame(invsplitlines, invschema)
invframe.createOrReplaceTempView("ghcndinventory")
invframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcndinventory.parquet")
invframe.cache()
print("Imported GhcndInventory")
return invframe
def import_ghcnd_all(scon, spark):
"""
Import all files from GHCND.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
localfilepath = "file://" + GHCNDHOMEPATH
import_ghcnd_countries(scon, spark, localfilepath)
import_ghcnd_stations(scon, spark, localfilepath)
import_ghcnd_inventory(scon, spark, localfilepath)
# import_all_data(scon, spark, GHCNDHOMEPATH)
import_data_rdd_parallel(scon, spark, localfilepath)
def read_ghcnd_from_parquet(spark):
"""
Read all data from the parquet files into Dataframes.
Create temporary views from the parquet files.
Parameters
----------
spark : SparkSession
The SQL Session.
Returns
-------
None.
"""
dfcountries = spark.read.parquet(GHCNDPATH + "ghcndcountries")
dfcountries.createOrReplaceTempView("ghcndcountries")
dfcountries.cache()
dfstations = spark.read.parquet(GHCNDPATH + "ghcndstations")
dfstations.createOrReplaceTempView("ghcndstations")
dfstations.cache()
dfinventory = spark.read.parquet(GHCNDPATH + "ghcndinventory")
dfinventory.createOrReplaceTempView("ghcndinventory")
dfinventory.cache()
dfdata = spark.read.parquet(GHCNDPATH + "ghcnddata")
dfdata.createOrReplaceTempView("ghcnddata")
dfdata.cache()
def delete_all_parquet_ghcnd():
"""
Delete all parquet files that were imported from GHCND.
Returns
-------
None.
"""
delete_from_hdfs(GHCNDPATH + "ghcndstations.parquet")
delete_from_hdfs(GHCNDPATH + "ghcndcountries.parquet")
delete_from_hdfs(GHCNDPATH + "ghcndinventory.parquet")
delete_from_hdfs(GHCNDPATH + "ghcnddata.parquet")
def delete_from_hdfs(path):
"""
Delete the file in path from HDFS.
Parameters
----------
path : string
Path of the file in HDFS.
Returns
-------
None.
"""
call("hdfs dfs -rm -R " + path,
shell=True)

View File

@@ -1,144 +1,167 @@
from sparkstart import scon, spark
from pyspark import SparkContext, rdd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, FloatType
from pyspark.sql import Row
import pyspark.sql.functions as F
import re
import matplotlib.pyplot as plt
CDC_PATH = "/data/cdc/hourly/"
HDFS_HOME = "hdfs://193.174.205.250:54310/"
HDFSPATH = "hdfs://193.174.205.250:54310/"
GHCNDPATH = HDFSPATH + "ghcnd/"
GHCNDHOMEPATH = "/data/ghcnd/"
# a) Stationsdaten einlesen & als Parquet speichern
def a(scon, spark, path=CDC_PATH):
stationlines = scon.textFile(path + "TU_Stundenwerte_Beschreibung_Stationen.txt")
# Aufgabe 9 a
stationlines = stationlines.zipWithIndex().filter(lambda x: x[1] >= 2).map(lambda x: x[0])
stationsplitlines = stationlines.map(lambda l: (
l[0:5].strip(),
l[6:14].strip(),
l[15:23].strip(),
int(l[24:41].strip()),
float(l[42:52].strip()),
float(l[53:61].strip()),
l[61:101].strip(),
l[102:].strip()
))
def import_data(spark: SparkSession, scon: SparkContext):
"""
%time import_data(spark, scon)
"""
stationschema = StructType([
StructField('stationid', StringType(), True),
StructField('from_date', StringType(), True),
StructField('to_date', StringType(), True),
StructField('height', IntegerType(), True),
StructField('latitude', FloatType(), True),
StructField('longitude', FloatType(), True),
StructField('stationname', StringType(), True),
StructField('state', StringType(), True)
])
stationframe = spark.createDataFrame(stationsplitlines, schema=stationschema)
stationframe.createOrReplaceTempView("cdc_stations")
outfile = HDFS_HOME + "/home/kramlingermike/" + "cdc_stations.parquet"
stationframe.write.mode('overwrite').parquet(outfile)
stationframe.cache()
# a) Beispielabfrage
def get_all_cdc_stations(spark):
result = spark.sql(f"""
SELECT *
FROM cdc_stations
ORDER BY stationname
""")
result.show(truncate=False)
# a) Beispielabfrage
def get_cdc_stations_per_state(spark):
result = spark.sql(f"""
SELECT
state,
COUNT(*) AS count
FROM cdc_stations
GROUP BY state
ORDER BY count DESC
""")
result.show(truncate=False)
def b(scon, spark):
lines = scon.textFile(CDC_PATH + "produkt*")
# Daten in RDD einlesen
rdd_station = scon.textFile("/data/cdc/hourly/TU_Stundenwerte_Beschreibung_Stationen.txt")
lines = lines.filter(lambda line: not line.startswith("STATIONS_ID"))
lines = lines.zipWithIndex().filter(lambda x: x[1] >= 0).map(lambda x: x[0])
lines = lines.map(lambda l: l.split(";"))
lines = lines.map(lambda s: (
s[0].strip(),
s[1].strip()[:8],
int(s[1].strip()[8:]),
int(s[2].strip()),
float(s[3].strip()),
float(s[4].strip())
))
schema = StructType([
StructField("stationid", StringType(), True),
StructField("date", StringType(), True),
StructField("hour", IntegerType(), True),
StructField("qn_9", IntegerType(), True),
StructField("tt_tu", FloatType(), True),
StructField("rf_tu", FloatType(), True)
])
df = spark.createDataFrame(lines, schema)
df.createOrReplaceTempView("cdc_hourly")
outfile = HDFS_HOME + "home/kramlingermike/" + "cdc_hourly.parquet"
df.write.mode("overwrite").parquet(outfile)
def get_hourly_station(spark, stationid, limit=20):
result = spark.sql(f"""
SELECT *
FROM cdc_hourly
WHERE stationid = '{stationid}'
ORDER BY date, hour
LIMIT {limit}
""")
result.show(truncate=False)
def avg_temp_per_day(spark, stationid, limit=20):
result = spark.sql(f"""
SELECT date, ROUND(AVG(tt_tu),2) AS avg_temp
FROM cdc_hourly
WHERE stationid = '{stationid}'
GROUP BY date
ORDER BY date
LIMIT {limit}
""")
result.show(truncate=False)
# Entfernen der ersten beiden Zeilen (Header und Trennzeile)
rdd_station_filterd = (rdd_station
.zipWithIndex() # jede Zeile bekommt idx
.filter(lambda x: x[1] >= 2) # nur Zeilen mit idx >= 2 behalten
.map(lambda x: x[0])) # idx wieder entfernen
rdd_station_splitlines = rdd_station_filterd.map(
lambda l: (
int(l[:6].strip()), # Station ID
l[6:15], # von_datum
l[15:24], # bis_datum
float(l[24:40].strip()), # stations höhe
float(l[40:53].strip()), # geoBreite
float(l[53:61].strip()), # geoHöhe
l[61:142], # Stationsname
l[142:-1] # Bundesland
))
# Datenschema festlegen
stationschema = StructType(
[
StructField("stationId", IntegerType(), True),
StructField("von_datum", StringType(), True),
StructField("bis_datum", StringType(), True),
StructField("hoehe", FloatType(), True),
StructField("geo_breite", FloatType(), True),
StructField("geo_laenge", FloatType(), True),
StructField("station_name", StringType(), True),
StructField("bundesland", StringType(), True)
]
)
# Data Frame erzeugen
stationframe = spark.createDataFrame(rdd_station_splitlines, schema=stationschema)
stationframe.printSchema()
# Temporäre View erzeugen
stationframe.createOrReplaceTempView("german_stations")
# Data Frame in HDFS speichern
stationframe.write.mode("overwrite").parquet(
HDFSPATH + "home/heiserervalentin/german_stations.parquet"
)
def read_data_from_parquet(spark):
"""
read_data_from_parquet(spark)
"""
df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations.parquet")
df.createOrReplaceTempView("german_stations")
df.cache()
def sql_querys(spark):
"""
sql_querys(spark)
"""
spark.sql("SELECT * FROM german_stations").show(5, truncate=False)
spark.sql("SELECT COUNT(*) AS Anzahl FROM german_stations").show()
spark.sql("SELECT MAX(geo_breite) FROM german_stations").show()
df = spark.sql("SELECT * FROM german_stations").toPandas()
plt.figure(figsize=[6,6])
plt.scatter(df.geo_laenge, df.geo_breite, marker='.', color = 'r')
plt.show()
def import_produkt_files(spark: SparkSession, scon: SparkContext, path='/data/cdc/hourly/'):
"""
import_produkt_files(spark, scon)
"""
# Daten in RDD einlesen
rdd_produkt = scon.textFile(f"{path}/produkt*")
# Kopfzeile und Leerzeichen filtern
rdd_filterd = rdd_produkt \
.filter(lambda l: l != 'STATIONS_ID;MESS_DATUM;QN_9;TT_TU;RF_TU;eor') \
.map(lambda l: [x.strip() for x in l.split(';')])
# Zeilen in Felder aufteilen
rdd_produkt_splitlines = rdd_filterd.map(
lambda l: (
int(l[0]), # Stat_id
l[1][:8], # Messdatum
int(l[1][8:10]), # Messstunde
int(l[2]), # Qualitätsniveau
float(l[3]), # Lufttemp.
float(l[4]), # rel. Luftfeuchte
int(l[1][0:4]) # jahr
)
)
print(rdd_produkt_splitlines.take(5))
# Datenschema definieren
product_schema = StructType(
[
StructField("stationId", IntegerType(), True),
StructField("date", StringType(), True),
StructField("hour", IntegerType(), True),
StructField("QN_9", IntegerType(), True),
StructField("TT_TU", FloatType(), True),
StructField("RF_TU", FloatType(), True),
StructField("jahr", IntegerType(), True)
]
)
product_frame = spark.createDataFrame(rdd_produkt_splitlines, schema=product_schema)
product_frame.printSchema()
product_frame.createOrReplaceTempView("german_stations_data")
product_frame.write.mode("overwrite").parquet(
HDFSPATH + "home/heiserervalentin/german_stations_data.parquet"
)
def read_product_data_from_parquet(spark):
"""
read_product_data_from_parquet(spark)
"""
df = spark.read.parquet(HDFSPATH + "home/heiserervalentin/german_stations_data.parquet")
df.createOrReplaceTempView("german_stations_data")
df.cache()
def main(scon, spark):
"""
main(scon, spark)
"""
# Daten importieren
import_data(spark, scon)
read_data_from_parquet(spark)
sql_querys(spark)
print("a)")
a(scon, spark)
print("Beispielabfrage: (Alle Stationen:)")
get_all_cdc_stations(spark)
print("Beispielabfrage: (Alle Stationen pro Bundesland)")
get_cdc_stations_per_state(spark)
print("b)")
b(scon, spark)
print("Beispielabfrage: (Alle Daten für eine Station:)")
get_hourly_station(spark, "4271")
print("Beispielabfrage: (Durchschnittliche Temperatur pro Tag für eine Station:)")
avg_temp_per_day(spark, "4271")
import_produkt_files(spark, scon)
read_product_data_from_parquet(spark)
if __name__ == "__main__":
main(scon, spark)
main(scon, spark)