add 9 + 10

This commit is contained in:
2025-11-28 12:04:55 +01:00
parent 068422c1e1
commit 90524173f4
6 changed files with 1083 additions and 1 deletions

209
Aufgabe 10/Aufgabe10.py Normal file
View File

@@ -0,0 +1,209 @@
from sparkstart import scon, spark
import ghcnd_stations
import matplotlib.pyplot as plt
import time
# a) Scatterplot: alle Stationen (lon/lat)
def plot_all_stations(spark):
q = """
SELECT stationname, latitude, longitude
FROM cdc_stations
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}")
lats = [r['latitude'] for r in rows]
lons = [r['longitude'] for r in rows]
names = [r['stationname'] for r in rows]
plt.figure(figsize=(8,6))
plt.scatter(lons, lats, s=10, alpha=0.6)
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('Alle CDC-Stationen (Scatter)')
plt.grid(True)
plt.show()
# b) Scatterplot: Stationsdauer in Jahren als Marker-Size
def plot_station_duration(spark, size_factor=20):
q = """
SELECT
stationname,
latitude,
longitude,
(CAST(CASE WHEN length(to_date) >= 4 THEN substr(to_date,1,4) ELSE year(current_date()) END AS INT)
- CAST(substr(from_date,1,4) AS INT)) AS years
FROM cdc_stations
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Rows: {len(rows)}")
lats = [r['latitude'] for r in rows]
lons = [r['longitude'] for r in rows]
years = [r['years'] if r['years'] is not None else 0 for r in rows]
sizes = [max(5, (y+1) * size_factor) for y in years]
plt.figure(figsize=(8,6))
plt.scatter(lons, lats, s=sizes, alpha=0.6)
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('CDC-Stationen: Dauer der Verfuegbarkeit (Größe ~ Jahre)')
plt.grid(True)
plt.show()
def plot_frost_distribution_year(spark, year):
q = f"""
WITH daily_max AS (
SELECT stationid, date, MAX(tt_tu) AS max_temp
FROM cdc_hourly
WHERE length(date) >= 8 AND substr(date,1,4) = '{year}'
GROUP BY stationid, date
),
station_frost AS (
SELECT dm.stationid, SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
FROM daily_max dm
GROUP BY dm.stationid
)
SELECT sf.frostdays, COUNT(*) AS stations
FROM station_frost sf
GROUP BY sf.frostdays
ORDER BY sf.frostdays
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Distinct frostdays: {len(rows)}")
x = [r['frostdays'] for r in rows]
y = [r['stations'] for r in rows]
plt.figure(figsize=(8,5))
plt.bar(x, y)
plt.xlabel('Anzahl Frosttage im Jahr ' + str(year))
plt.ylabel('Anzahl Stationen')
plt.title(f'Verteilung der Frosttage pro Station im Jahr {year}')
plt.grid(True)
plt.show()
# c2) Frosttage Zeitreihe für eine Station mit 5- und 20-Jahres Durchschnitt (SQL window)
def plot_station_frost_timeseries(spark, station_name):
q = f"""
WITH daily_max AS (
SELECT stationid, date, MAX(tt_tu) AS max_temp
FROM cdc_hourly
GROUP BY stationid, date
),
yearly AS (
SELECT
dm.stationid,
CAST(substr(dm.date,1,4) AS INT) AS year,
SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
FROM daily_max dm
GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT)
),
station_yearly AS (
SELECT
y.year,
y.frostdays,
AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg5,
AVG(y.frostdays) OVER (ORDER BY y.year ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg20
FROM yearly y
JOIN cdc_stations s ON y.stationid = s.stationid
WHERE trim(upper(s.stationname)) = '{station_name.upper()}'
ORDER BY y.year
)
SELECT * FROM station_yearly
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years: {len(rows)}")
if not rows:
print(f"Keine Daten f\u00fcr Station '{station_name}'.")
return
years = [r['year'] for r in rows]
frostdays = [r['frostdays'] for r in rows]
avg5 = [r['avg5'] for r in rows]
avg20 = [r['avg20'] for r in rows]
plt.figure(figsize=(10,5))
plt.plot(years, frostdays, label='Frosttage (Jahr)')
plt.plot(years, avg5, label='5-Jahres-Durchschnitt')
plt.plot(years, avg20, label='20-Jahres-Durchschnitt')
plt.xlabel('Jahr')
plt.ylabel('Anzahl Frosttage')
plt.title(f'Frosttage f\u00fcr Station {station_name}')
plt.legend()
plt.grid(True)
plt.show()
# d) Korrelation Hoehe vs. Frosttage pro Jahr
def plot_height_frost_correlation(spark):
q = """
WITH daily_max AS (
SELECT stationid, date, MAX(tt_tu) AS max_temp
FROM cdc_hourly
GROUP BY stationid, date
),
yearly AS (
SELECT
dm.stationid,
CAST(substr(dm.date,1,4) AS INT) AS year,
SUM(CASE WHEN dm.max_temp < 0 THEN 1 ELSE 0 END) AS frostdays
FROM daily_max dm
GROUP BY dm.stationid, CAST(substr(dm.date,1,4) AS INT)
),
joined AS (
SELECT y.year, s.height, y.frostdays
FROM yearly y
JOIN cdc_stations s ON y.stationid = s.stationid
),
yearly_corr AS (
SELECT year, corr(height, frostdays) AS corr
FROM joined
GROUP BY year
ORDER BY year
)
SELECT year, corr FROM yearly_corr WHERE corr IS NOT NULL
"""
t0 = time.time()
rows = spark.sql(q).collect()
t1 = time.time()
print(f"Ausfuehrungszeit (SQL): {t1 - t0:.3f}s -- Years with corr: {len(rows)}")
if not rows:
print("Keine Korrelationsdaten verfügbar.")
return
years = [r['year'] for r in rows]
corr = [r['corr'] for r in rows]
plt.figure(figsize=(10,5))
plt.bar(years, corr)
plt.xlabel('Jahr')
plt.ylabel('Korrelationskoeffizient (height vs frostdays)')
plt.title('Korrelation Hoehe vs. Frosttage pro Jahr')
plt.grid(True)
plt.show()
if __name__ == '__main__':
ghcnd_stations.read_ghcnd_from_parquet(spark)
plot_all_stations(spark)
plot_station_duration(spark)
plot_frost_distribution_year(spark, '2010')
plot_station_frost_timeseries(spark, 'KEMPTEN')
plot_height_frost_correlation(spark)
pass

View File

@@ -0,0 +1,689 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Load stations, countries, inventory and data from GHCND as Dataset.
@author: steger
"""
# pylint: disable=pointless-string-statement
import os
from datetime import date
from time import time
from subprocess import call
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DateType
# =============================================
# run sparkstart.py before to create a session
# =============================================
HDFSPATH = "hdfs://193.174.205.250:54310/"
GHCNDPATH = HDFSPATH + "ghcnd/"
GHCNDHOMEPATH = "/data/ghcnd/"
def conv_elevation(elev):
"""
Convert an elevation value.
-999.9 means there is no value.
Parameters
----------
elev : string
The elevation to convert to float.
Returns
-------
res : numeric
The converted value as float.
"""
elev = elev.strip()
if elev == "-999.9":
res = None
else:
res = float(elev)
return res
def conv_data_value(line, start):
"""
Convert a single data value from a dly.- File.
Parameters
----------
line : string
The line with the data value.
start : int
The index at which the value starts.
Returns
-------
res : numeric
The onverted data value as int.
"""
return int(line[start:start+5].strip())
def import_ghcnd_stations(scon, spark, path):
"""
Read the station data into a dataframe.
Register it as temporary view and write it to parquet.
Parameters
----------
scon : SparkContext
The spark context.
spark : SparkSession
The SQL session.
Returns
-------
stationFrame : DataFrame
The spark Data Frame with the stations data.
"""
stationlines = scon.textFile(path + "ghcnd-stations.txt")
stationsplitlines = stationlines.map(
lambda l:
(l[0:2],
l[2:3],
l[0:11],
float(l[12:20].strip()),
float(l[21:30].strip()),
conv_elevation(l[31:37]),
l[41:71]
))
stationschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('latitude', FloatType(), True),
StructField('longitude', FloatType(), True),
StructField('elevation', FloatType(), True),
StructField('stationname', StringType(), True)
])
stationframe = spark.createDataFrame(stationsplitlines,
schema=stationschema)
stationframe.createOrReplaceTempView("ghcndstations")
stationframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcndstations.parquet")
stationframe.cache()
print("Imported GhcndStations")
return stationframe
def import_ghcnd_countries(scon, spark, path):
"""
Read the countries data into a dataframe.
Register it as temptable and write it to parquet.
Parameters
----------
scon : SparkContext
The spark context.
spark : SparkSession
The SQL session.
path : string
The path where the file with data resides.
Returns
-------
stationFrame : DataFrame
The spark Data Frame with the countries data.
"""
countrylines = scon.textFile(path + "ghcnd-countries.txt")
countrysplitlines = countrylines.map(lambda l: (l[0:2], l[2:50]))
countryschema = StructType([
StructField('countrycode', StringType(), True),
StructField('countryname', StringType(), True)])
countryframe = spark.createDataFrame(countrysplitlines, countryschema)
countryframe.createOrReplaceTempView("ghcndcountries")
countryframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcndcountries.parquet")
countryframe.cache()
print("Imported GhcndCountries")
return countryframe
def conv_data_line(line):
"""
Convert a data line from GHCND-Datafile (.dly).
Parameters
----------
line : string
String with a data line containing the values for one month.
Returns
-------
list of tuple
List containing a tuple for each data value.
"""
if line == '':
return []
countrycode = line[0:2]
networkcode = line[2:3]
stationid = line[0:11]
year = int(line[11:15])
month = int(line[15:17])
element = line[17:21]
datlst = []
for i in range(0, 30):
val = conv_data_value(line, 21 + i*8)
if val != -9999:
datlst.append((countrycode, networkcode, stationid,
year, month, i+1,
date(year, month, i+1),
element,
val))
return datlst
def read_dly_file(scon, spark, filename):
"""
Read a .dly-file into a data frame.
Parameters
----------
scon : SparkContext
The spark context.
spark : SparkSession
The SQL session.
filename : string
The name and path of the dly-File.
Returns
-------
RDD
The RDD with the contents of the dly-File.
"""
dly = scon.textFile(filename)
return process_dly_file_lines(spark, dly)
def process_dly_file_lines(spark, lines):
"""
Process the lines of one dly file.
Parameters
----------
spark : SparkSession
The SQL session.
lines : RDD
RDD with one value per line.
Returns
-------
dlyFrame : DataFram
Data Frame containing the data of the file.
"""
dlsplit = lines.flatMap(conv_data_line)
dlyfileschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('year', IntegerType(), True),
StructField('month', IntegerType(), True),
StructField('day', IntegerType(), True),
StructField('date', DateType(), True),
StructField('element', StringType(), True),
StructField('value', IntegerType(), True)
])
dlyframe = spark.createDataFrame(dlsplit, dlyfileschema)
return dlyframe
def import_data_rdd_parallel(scon, spark, path):
"""
Import the data files from ghcnd in parallel.
This is much faster on a cluster or a computer with many cores
and enough main memory to hold all the raw data.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
rdd = scon.textFile(
path+"/ghcnd_all/*.dly", minPartitions=5000)
rddcoa = rdd.coalesce(5000)
rddsplit = rddcoa.flatMap(conv_data_line)
print("Number of data records = " + str(rddsplit.count()))
print("Number of partitions = " + str(rddsplit.getNumPartitions()))
dlyfileschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('year', IntegerType(), True),
StructField('month', IntegerType(), True),
StructField('day', IntegerType(), True),
StructField('date', DateType(), True),
StructField('element', StringType(), True),
StructField('value', IntegerType(), True)
])
dlyframe = spark.createDataFrame(rddsplit, dlyfileschema)
dlyframe.show(10)
dlyframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcnddata.parquet")
print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet"))
def import_data_rdd_parallel_whole(scon, spark, path):
"""
Import the data files from ghcnd in parallel.
This is much faster on a cluster or a computer with many cores
and enough main memory to hold all the raw data.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
rdd = scon.wholeTextFiles(
path+"/ghcnd_all/*.dly", minPartitions=5000 )
rddvals = rdd.values()
print("Number of files in GHCND = " + str(rddvals.count()))
rddlen = rddvals.map(len)
print("Number of characters in all files = " +
str(rddlen.reduce(lambda x, y: x + y)))
rddlines = rddvals.flatMap(lambda x: x.split("\n"))
print("Number of lines with data = " + str(rddlines.count()))
rddsplit = rddlines.flatMap(conv_data_line)
print("Number of data records = " + str(rddsplit.count()))
print("Number of partitions = " + str(rddsplit.getNumPartitions()))
dlyfileschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('year', IntegerType(), True),
StructField('month', IntegerType(), True),
StructField('day', IntegerType(), True),
StructField('date', DateType(), True),
StructField('element', StringType(), True),
StructField('value', IntegerType(), True)
])
dlyframe = spark.createDataFrame(rddsplit, dlyfileschema)
dlyframe.show(10)
dlyframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcnddata.parquet")
print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet"))
"""
Code for testing problems that resulted finally from empty lines
to solve the problem the code
if line == '':
return []
was added at the beginning of convDataLine to filter away empty lines:
noyear = rddsplit.filter(lambda x: not x[3].isnumeric())
noyear.collect()
rddlines1 = rdd.flatMap(lambda x: [(x[0], y) for y in x[1].split("\n")])
print(rddlines1.count())
rddsplit1 = rddlines1.flatMap(convDataLine1)
print(rddsplit1.count())
noyear1 = rddsplit1.filter(lambda x: not x[1][3].isnumeric())
noyear1.collect()
"""
def import_ghcnd_files_extern(scon, spark, path, stationlist, batchsize,
numparts):
"""
Import multiple data files in one batch.
Import batchsize data files in one batch and append the data into
the parquet file.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
path : string
Path of the data files.
stationlist : list
List of all stations to load.
batchsize : int
Number of files to load in one batch.
numparts : int
Number of partitions to write one batch.
Returns
-------
None.
"""
data = None
count = 0
allcount = 0
batchcount = 0
for station in stationlist:
# filename = "file://" + path + "/" + station + ".dly"
filename = path + station + ".dly"
if os.path.isfile(filename):
dly = read_dly_file(spark, scon, "file://" + filename)
if data is not None:
data = data.union(dly)
print("Batch " + str(batchcount) +
" Filenr " + str(count) + " Processing " + filename)
else:
tstart = time()
data = dly
count += 1
if count >= batchsize:
# data = data.sort('countrycode', 'stationid', 'date')
data = data.coalesce(numparts)
tcoalesce = time()
data.write.mode('Append').parquet(
GHCNDPATH + "ghcnddata.parquet")
anzrec = data.count()
twrite = time()
print(
"\n\nBatch " + str(batchcount) +
" #recs " + str(anzrec) +
" #files " + str(allcount) +
" readtime " + str.format("{:f}", tcoalesce - tstart) +
" writetime " + str.format("{:f}", twrite - tcoalesce) +
" recs/sec " +
str.format("{:f}", anzrec / (twrite - tstart)) + "\n\n")
allcount += count
count = 0
batchcount += 1
data = None
else:
print("importGhcndFilesExtern: " + station +
", " + filename + " not found")
if data is not None:
data = data.coalesce(numparts)
data.write.mode('Append').parquet(GHCNDPATH + "ghcnddata.parquet")
def import_all_data(scon, spark, path):
"""
Import all data from GHCND.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
path : string
Path of data files.
Returns
-------
None.
"""
stationlist = spark.sql(
"SELECT stationid AS station \
FROM ghcndstations \
ORDER BY station")
pds = stationlist.toPandas()
import_ghcnd_files_extern(scon, spark, path + "ghcnd_all/",
pds.station, 30, 1)
def import_data_single_files(scon, spark, stationlist, parquetname, path):
"""
Import the data files one by one.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
stationlist : list
List of all stations to import data.
parquetname : string
Name of the parquet file to write the data to.
path : string
Path where the data files reside.
Returns
-------
None.
"""
pds = stationlist.toPandas()
cnt = 0
for station in pds.station:
filename = path + station + ".dly"
if os.path.isfile(filename):
start = time()
dly = read_dly_file(spark, scon, "file://" + filename)
numrec = dly.count()
dly = dly.coalesce(1).sort('element', 'date')
read = time()
dly.write.mode('Append').parquet(GHCNDPATH
+ parquetname + ".parquet")
finish = time()
print(str.format(
"{:8d} ", cnt) + station +
" #rec " + str.format("{:7d}", numrec) +
" read " + str.format("{:f}", read - start) +
" write " + str.format("{:f}", finish - read) +
" write/sec " + str.format("importDataSingleFiles{:f} ",
numrec/(finish - read))
+ " " + filename)
else:
print("#### " + str(cnt) + " File " +
filename + " does not exist ####")
cnt += 1
def check_files(spark):
"""
Check if some files for generated stationnames do not exist.
Parameters
----------
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
stationlist = spark.sql(
"SELECT CONCAT(countrycode, networkcode, stationid) AS station \
FROM ghcndstations \
ORDER BY station")
pds = stationlist.toPandas()
count = 1
for station in pds.station:
filename = "/nfs/home/steger/ghcnd/ghcnd_all/" + station + ".dly"
if os.path.isfile(filename):
# print(str(count) + " " + station)
pass
else:
print(str(count) + " File does not exist: " + filename)
count += 1
"""
Read the inventory data into a dataframe,
register it as temporary view and write it to parquet
"""
def import_ghcnd_inventory(scon, spark, path):
"""
Import inventory information from GHCND.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
path : string
Path for inventory file.
Returns
-------
invframe : DataFrame
Data Frame with inventory data.
"""
invlines = scon.textFile(path + "ghcnd-inventory.txt")
invsplitlines = invlines.map(
lambda l:
(l[0:2],
l[2:3],
l[0:11],
float(l[12:20].strip()),
float(l[21:30].strip()),
l[31:35],
int(l[36:40]),
int(l[41:45])
))
invschema = StructType([
StructField('countrycode', StringType(), True),
StructField('networkcode', StringType(), True),
StructField('stationid', StringType(), True),
StructField('latitude', FloatType(), True),
StructField('longitude', FloatType(), True),
StructField('element', StringType(), True),
StructField('firstyear', IntegerType(), True),
StructField('lastyear', IntegerType(), True)
])
invframe = spark.createDataFrame(invsplitlines, invschema)
invframe.createOrReplaceTempView("ghcndinventory")
invframe.write.mode('overwrite').parquet(
GHCNDPATH + "ghcndinventory.parquet")
invframe.cache()
print("Imported GhcndInventory")
return invframe
def import_ghcnd_all(scon, spark):
"""
Import all files from GHCND.
Parameters
----------
scon : SparkContext
The context.
spark : SparkSession
The SQL session.
Returns
-------
None.
"""
localfilepath = "file://" + GHCNDHOMEPATH
import_ghcnd_countries(scon, spark, localfilepath)
import_ghcnd_stations(scon, spark, localfilepath)
import_ghcnd_inventory(scon, spark, localfilepath)
# import_all_data(scon, spark, GHCNDHOMEPATH)
import_data_rdd_parallel(scon, spark, localfilepath)
def read_ghcnd_from_parquet(spark):
"""
Read all data from the parquet files into Dataframes.
Create temporary views from the parquet files.
Parameters
----------
spark : SparkSession
The SQL Session.
Returns
-------
None.
"""
dfcountries = spark.read.parquet(GHCNDPATH + "ghcndcountries")
dfcountries.createOrReplaceTempView("ghcndcountries")
dfcountries.cache()
dfstations = spark.read.parquet(GHCNDPATH + "ghcndstations")
dfstations.createOrReplaceTempView("ghcndstations")
dfstations.cache()
dfinventory = spark.read.parquet(GHCNDPATH + "ghcndinventory")
dfinventory.createOrReplaceTempView("ghcndinventory")
dfinventory.cache()
dfdata = spark.read.parquet(GHCNDPATH + "ghcnddata")
dfdata.createOrReplaceTempView("ghcnddata")
dfdata.cache()
def delete_all_parquet_ghcnd():
"""
Delete all parquet files that were imported from GHCND.
Returns
-------
None.
"""
delete_from_hdfs(GHCNDPATH + "ghcndstations.parquet")
delete_from_hdfs(GHCNDPATH + "ghcndcountries.parquet")
delete_from_hdfs(GHCNDPATH + "ghcndinventory.parquet")
delete_from_hdfs(GHCNDPATH + "ghcnddata.parquet")
def delete_from_hdfs(path):
"""
Delete the file in path from HDFS.
Parameters
----------
path : string
Path of the file in HDFS.
Returns
-------
None.
"""
call("hdfs dfs -rm -R " + path,
shell=True)

22
Aufgabe 10/sparkstart.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()

View File

@@ -133,7 +133,7 @@ def plot_avg_tmax_day(station_name):
days = [row['day'] for row in df_avg]
avg_tmax = [row['avg_tmax'] for row in df_avg]
#TODO: Mit SQL machen
# 21-Tage gleitender Durchschnitt (10 Tage davor, Tag selbst, 10 Tage danach)
rolling_avg = []
for i in range(len(avg_tmax)):

141
Aufgabe 9/Aufgabe9.py Normal file
View File

@@ -0,0 +1,141 @@
#!/usr/bin/env python3
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, FloatType
from pyspark.sql import Row
import pyspark.sql.functions as F
import re
CDC_PATH = "/data/cdc/hourly/"
HDFS_HOME = "hdfs://193.174.205.250:54310/"
# a) Stationsdaten einlesen & als Parquet speichern
def a(scon, spark, path=CDC_PATH):
stationlines = scon.textFile(path + "TU_Stundenwerte_Beschreibung_Stationen.txt")
stationlines = stationlines.zipWithIndex().filter(lambda x: x[1] >= 2).map(lambda x: x[0])
stationsplitlines = stationlines.map(lambda l: (
l[0:5].strip(),
l[6:14].strip(),
l[15:23].strip(),
int(l[24:41].strip()),
float(l[42:52].strip()),
float(l[53:61].strip()),
l[61:101].strip(),
l[102:].strip()
))
stationschema = StructType([
StructField('stationid', StringType(), True),
StructField('from_date', StringType(), True),
StructField('to_date', StringType(), True),
StructField('height', IntegerType(), True),
StructField('latitude', FloatType(), True),
StructField('longitude', FloatType(), True),
StructField('stationname', StringType(), True),
StructField('state', StringType(), True)
])
stationframe = spark.createDataFrame(stationsplitlines, schema=stationschema)
stationframe.createOrReplaceTempView("cdc_stations")
outfile = HDFS_HOME + "/home/kramlingermike/" + "cdc_stations.parquet"
stationframe.write.mode('overwrite').parquet(outfile)
stationframe.cache()
# a) Beispielabfrage
def get_all_cdc_stations(spark):
result = spark.sql(f"""
SELECT *
FROM cdc_stations
ORDER BY stationname
""")
result.show(truncate=False)
# a) Beispielabfrage
def get_cdc_stations_per_state(spark):
result = spark.sql(f"""
SELECT
state,
COUNT(*) AS count
FROM cdc_stations
GROUP BY state
ORDER BY count DESC
""")
result.show(truncate=False)
def b(scon, spark):
lines = scon.textFile(CDC_PATH + "produkt*")
lines = lines.filter(lambda line: not line.startswith("STATIONS_ID"))
lines = lines.zipWithIndex().filter(lambda x: x[1] >= 0).map(lambda x: x[0])
lines = lines.map(lambda l: l.split(";"))
lines = lines.map(lambda s: (
s[0].strip(),
s[1].strip()[:8],
int(s[1].strip()[8:]),
int(s[2].strip()),
float(s[3].strip()),
float(s[4].strip())
))
schema = StructType([
StructField("stationid", StringType(), True),
StructField("date", StringType(), True),
StructField("hour", IntegerType(), True),
StructField("qn_9", IntegerType(), True),
StructField("tt_tu", FloatType(), True),
StructField("rf_tu", FloatType(), True)
])
df = spark.createDataFrame(lines, schema)
df.createOrReplaceTempView("cdc_hourly")
outfile = HDFS_HOME + "home/kramlingermike/" + "cdc_hourly.parquet"
df.write.mode("overwrite").parquet(outfile)
def get_hourly_station(spark, stationid, limit=20):
result = spark.sql(f"""
SELECT *
FROM cdc_hourly
WHERE stationid = '{stationid}'
ORDER BY date, hour
LIMIT {limit}
""")
result.show(truncate=False)
def avg_temp_per_day(spark, stationid, limit=20):
result = spark.sql(f"""
SELECT date, ROUND(AVG(tt_tu),2) AS avg_temp
FROM cdc_hourly
WHERE stationid = '{stationid}'
GROUP BY date
ORDER BY date
LIMIT {limit}
""")
result.show(truncate=False)
def main(scon, spark):
"""
main(scon, spark)
"""
print("a)")
a(scon, spark)
print("Beispielabfrage: (Alle Stationen:)")
get_all_cdc_stations(spark)
print("Beispielabfrage: (Alle Stationen pro Bundesland)")
get_cdc_stations_per_state(spark)
print("b)")
b(scon, spark)
print("Beispielabfrage: (Alle Daten für eine Station:)")
get_hourly_station(spark, "4271")
print("Beispielabfrage: (Durchschnittliche Temperatur pro Tag für eine Station:)")
avg_temp_per_day(spark, "4271")

21
Aufgabe 9/sparkstart.py Normal file
View File

@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
"""
Erzeugen einer Spark-Konfiguration
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# connect to cluster
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.executor.memory", '32g')
conf.set("spark.driver.memory", '8g')
conf.set("spark.cores.max", "40")
scon = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.appName("Python Spark SQL") \
.getOrCreate()