mirror of
https://github.com/Vale54321/BigData.git
synced 2025-12-11 09:59:33 +01:00
add Aufgabe 6-8
This commit is contained in:
59
Aufgabe 6/main.py
Normal file
59
Aufgabe 6/main.py
Normal file
@@ -0,0 +1,59 @@
|
||||
def a(scon, spark, path):
|
||||
rdd = scon.textFile(path)
|
||||
return rdd
|
||||
|
||||
def b(rdd):
|
||||
print("\n=== Aufgabe 6b ===")
|
||||
total_chars = rdd.map(lambda line: len(line)).reduce(lambda a, b: a + b)
|
||||
print("Anzahl aller Zeichen im Text:", total_chars)
|
||||
return total_chars
|
||||
|
||||
def c(rdd, top_n=20):
|
||||
print("\n=== Aufgabe 6c ===")
|
||||
chars = rdd.flatMap(lambda line: list(line))
|
||||
freq = chars.map(lambda c: (c, 1)).reduceByKey(lambda a, b: a + b)
|
||||
top_chars = freq.takeOrdered(top_n, key = lambda x: -x[1])
|
||||
|
||||
print(f"\n Top {top_n} häufigste Zeichen:")
|
||||
for ch, count in top_chars:
|
||||
print(f"'{ch}': {count}")
|
||||
|
||||
unique_chars = chars.distinct().collect()
|
||||
print("\nAnzahl verschiedener Zeichen:", len(unique_chars))
|
||||
return freq
|
||||
|
||||
def d(rdd, top_n=20):
|
||||
print("\n=== Aufgabe 6d ===")
|
||||
from string import punctuation
|
||||
def remove_punct(line):
|
||||
return "".join([" " if ch in punctuation else ch for ch in line])
|
||||
|
||||
cleaned = rdd.map(remove_punct)
|
||||
|
||||
words = (cleaned
|
||||
.flatMap(lambda line: line.split())
|
||||
.map(lambda w: w.lower())
|
||||
.filter(lambda w: w != ""))
|
||||
|
||||
word_freq = words.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)
|
||||
top_words = word_freq.takeOrdered(top_n, key=lambda x: -x[1])
|
||||
|
||||
print(f"\nTop {top_n} häufigste Wörter:")
|
||||
for w, c in top_words:
|
||||
print(f"{w}: {c}")
|
||||
|
||||
return word_freq
|
||||
|
||||
def e(scon,spark, path, top_n=20):
|
||||
rdd = a(scon, spark, path)
|
||||
return d(rdd, top_n)
|
||||
|
||||
def main(scon, spark):
|
||||
"""
|
||||
main(scon, spark)
|
||||
"""
|
||||
rdd = a(scon, spark, "/data/texte/test/robinsonCrusoe.txt")
|
||||
b(rdd)
|
||||
c(rdd)
|
||||
d(rdd)
|
||||
e(scon, spark, "/data/texte/test/DonQuijote.txt")
|
||||
92
Aufgabe 7/main.py
Normal file
92
Aufgabe 7/main.py
Normal file
@@ -0,0 +1,92 @@
|
||||
def readAllLanguages(scone, lang_file):
|
||||
return (scone.textFile(lang_file)
|
||||
.map(lambda line: line.strip().split(","))
|
||||
.filter(lambda x: len(x) == 2)
|
||||
.map(lambda x: (x[1].lower(), x[0].lower())))
|
||||
|
||||
def readAllTexts(scon, directory, partitions=1000):
|
||||
import os
|
||||
|
||||
rdd = scon.wholeTextFiles(directory, minPartitions=partitions)
|
||||
rdd = rdd.map(lambda x: (os.path.basename(x[0]), x[1]))
|
||||
return rdd
|
||||
|
||||
|
||||
def clean_and_split(rdd):
|
||||
from string import punctuation
|
||||
|
||||
def clean_text(text):
|
||||
for ch in punctuation + "\n\t\r":
|
||||
text = text.replace(ch, " ")
|
||||
return text
|
||||
|
||||
return (rdd.flatMap(lambda x: [(x[0], w.lower())
|
||||
for w in clean_text(x[1]).split(" ")
|
||||
if w.strip() != ""]))
|
||||
|
||||
|
||||
def unique_words_per_file(word_rdd):
|
||||
return (word_rdd.distinct()
|
||||
.map(lambda x: (x[1], x[0])))
|
||||
|
||||
|
||||
def join_with_languages(word_file_rdd, lang_rdd):
|
||||
return word_file_rdd.join(lang_rdd)
|
||||
|
||||
|
||||
|
||||
def count_words_per_language(joined_rdd):
|
||||
return (joined_rdd
|
||||
.map(lambda x: ((x[1][0], x[1][1]), 1))
|
||||
.reduceByKey(lambda a, b: a + b))
|
||||
|
||||
|
||||
|
||||
def detect_language_per_file(count_rdd):
|
||||
return (count_rdd
|
||||
.map(lambda x: (x[0][0], (x[0][1], x[1])))
|
||||
.reduceByKey(lambda a, b: a if a[1] > b[1] else b)
|
||||
.map(lambda x: (x[0], x[1][0])))
|
||||
|
||||
|
||||
def count_texts_per_language(lang_detected_rdd):
|
||||
return (lang_detected_rdd
|
||||
.map(lambda x: (x[1], 1))
|
||||
.reduceByKey(lambda a, b: a + b)
|
||||
.sortBy(lambda x: -x[1]))
|
||||
|
||||
|
||||
def detect_languages(scon, text_dir, lang_file, partitions=1000):
|
||||
"""
|
||||
detect_languages(scon, "/data/texte/txt", "/data/texte/languages.txt")
|
||||
"""
|
||||
import time
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
lang_rdd = readAllLanguages(scon, lang_file)
|
||||
|
||||
texts_rdd = readAllTexts(scon, text_dir, partitions=partitions)
|
||||
|
||||
words_rdd = clean_and_split(texts_rdd)
|
||||
unique_rdd = unique_words_per_file(words_rdd)
|
||||
joined_rdd = join_with_languages(unique_rdd, lang_rdd)
|
||||
counts_rdd = count_words_per_language(joined_rdd)
|
||||
detected_rdd = detect_language_per_file(counts_rdd)
|
||||
summary_rdd = count_texts_per_language(detected_rdd)
|
||||
|
||||
detected = detected_rdd.collect()
|
||||
summary = summary_rdd.collect()
|
||||
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
|
||||
print(f"Abgeschlossen in {duration:.2f} Sekunden\n")
|
||||
|
||||
print("Erkannte Sprachen pro Datei:")
|
||||
for f, lang in detected[:20]:
|
||||
print(f"{f}: {lang}")
|
||||
|
||||
print("\nGesamtanzahl Texte pro Sprache:")
|
||||
for lang, count in summary:
|
||||
print(f"{lang}: {count}")
|
||||
689
Aufgabe 8/ghcnd_stations.py
Normal file
689
Aufgabe 8/ghcnd_stations.py
Normal file
@@ -0,0 +1,689 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Load stations, countries, inventory and data from GHCND as Dataset.
|
||||
|
||||
@author: steger
|
||||
|
||||
"""
|
||||
|
||||
# pylint: disable=pointless-string-statement
|
||||
|
||||
import os
|
||||
from datetime import date
|
||||
from time import time
|
||||
from subprocess import call
|
||||
from pyspark.sql.types import StructType
|
||||
from pyspark.sql.types import StructField
|
||||
from pyspark.sql.types import StringType
|
||||
from pyspark.sql.types import FloatType
|
||||
from pyspark.sql.types import IntegerType
|
||||
from pyspark.sql.types import DateType
|
||||
|
||||
|
||||
# =============================================
|
||||
# run sparkstart.py before to create a session
|
||||
# =============================================
|
||||
|
||||
HDFSPATH = "hdfs://193.174.205.250:54310/"
|
||||
GHCNDPATH = HDFSPATH + "ghcnd/"
|
||||
GHCNDHOMEPATH = "/data/ghcnd/"
|
||||
|
||||
|
||||
def conv_elevation(elev):
|
||||
"""
|
||||
Convert an elevation value.
|
||||
|
||||
-999.9 means there is no value.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
elev : string
|
||||
The elevation to convert to float.
|
||||
|
||||
Returns
|
||||
-------
|
||||
res : numeric
|
||||
The converted value as float.
|
||||
"""
|
||||
elev = elev.strip()
|
||||
if elev == "-999.9":
|
||||
res = None
|
||||
else:
|
||||
res = float(elev)
|
||||
return res
|
||||
|
||||
|
||||
def conv_data_value(line, start):
|
||||
"""
|
||||
Convert a single data value from a dly.- File.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
line : string
|
||||
The line with the data value.
|
||||
start : int
|
||||
The index at which the value starts.
|
||||
|
||||
Returns
|
||||
-------
|
||||
res : numeric
|
||||
The onverted data value as int.
|
||||
"""
|
||||
return int(line[start:start+5].strip())
|
||||
|
||||
|
||||
def import_ghcnd_stations(scon, spark, path):
|
||||
"""
|
||||
Read the station data into a dataframe.
|
||||
|
||||
Register it as temporary view and write it to parquet.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The spark context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
stationFrame : DataFrame
|
||||
The spark Data Frame with the stations data.
|
||||
"""
|
||||
stationlines = scon.textFile(path + "ghcnd-stations.txt")
|
||||
stationsplitlines = stationlines.map(
|
||||
lambda l:
|
||||
(l[0:2],
|
||||
l[2:3],
|
||||
l[0:11],
|
||||
float(l[12:20].strip()),
|
||||
float(l[21:30].strip()),
|
||||
conv_elevation(l[31:37]),
|
||||
l[41:71]
|
||||
))
|
||||
stationschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('networkcode', StringType(), True),
|
||||
StructField('stationid', StringType(), True),
|
||||
StructField('latitude', FloatType(), True),
|
||||
StructField('longitude', FloatType(), True),
|
||||
StructField('elevation', FloatType(), True),
|
||||
StructField('stationname', StringType(), True)
|
||||
])
|
||||
stationframe = spark.createDataFrame(stationsplitlines,
|
||||
schema=stationschema)
|
||||
stationframe.createOrReplaceTempView("ghcndstations")
|
||||
stationframe.write.mode('overwrite').parquet(
|
||||
GHCNDPATH + "ghcndstations.parquet")
|
||||
stationframe.cache()
|
||||
print("Imported GhcndStations")
|
||||
return stationframe
|
||||
|
||||
|
||||
def import_ghcnd_countries(scon, spark, path):
|
||||
"""
|
||||
Read the countries data into a dataframe.
|
||||
|
||||
Register it as temptable and write it to parquet.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The spark context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
path : string
|
||||
The path where the file with data resides.
|
||||
|
||||
Returns
|
||||
-------
|
||||
stationFrame : DataFrame
|
||||
The spark Data Frame with the countries data.
|
||||
"""
|
||||
countrylines = scon.textFile(path + "ghcnd-countries.txt")
|
||||
countrysplitlines = countrylines.map(lambda l: (l[0:2], l[2:50]))
|
||||
countryschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('countryname', StringType(), True)])
|
||||
countryframe = spark.createDataFrame(countrysplitlines, countryschema)
|
||||
countryframe.createOrReplaceTempView("ghcndcountries")
|
||||
countryframe.write.mode('overwrite').parquet(
|
||||
GHCNDPATH + "ghcndcountries.parquet")
|
||||
countryframe.cache()
|
||||
print("Imported GhcndCountries")
|
||||
return countryframe
|
||||
|
||||
|
||||
def conv_data_line(line):
|
||||
"""
|
||||
Convert a data line from GHCND-Datafile (.dly).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
line : string
|
||||
String with a data line containing the values for one month.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list of tuple
|
||||
List containing a tuple for each data value.
|
||||
"""
|
||||
if line == '':
|
||||
return []
|
||||
|
||||
countrycode = line[0:2]
|
||||
networkcode = line[2:3]
|
||||
stationid = line[0:11]
|
||||
year = int(line[11:15])
|
||||
month = int(line[15:17])
|
||||
element = line[17:21]
|
||||
datlst = []
|
||||
for i in range(0, 30):
|
||||
val = conv_data_value(line, 21 + i*8)
|
||||
if val != -9999:
|
||||
datlst.append((countrycode, networkcode, stationid,
|
||||
year, month, i+1,
|
||||
date(year, month, i+1),
|
||||
element,
|
||||
val))
|
||||
return datlst
|
||||
|
||||
|
||||
def read_dly_file(scon, spark, filename):
|
||||
"""
|
||||
Read a .dly-file into a data frame.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The spark context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
filename : string
|
||||
The name and path of the dly-File.
|
||||
|
||||
Returns
|
||||
-------
|
||||
RDD
|
||||
The RDD with the contents of the dly-File.
|
||||
"""
|
||||
dly = scon.textFile(filename)
|
||||
return process_dly_file_lines(spark, dly)
|
||||
|
||||
|
||||
def process_dly_file_lines(spark, lines):
|
||||
"""
|
||||
Process the lines of one dly file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
lines : RDD
|
||||
RDD with one value per line.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dlyFrame : DataFram
|
||||
Data Frame containing the data of the file.
|
||||
|
||||
"""
|
||||
dlsplit = lines.flatMap(conv_data_line)
|
||||
dlyfileschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('networkcode', StringType(), True),
|
||||
StructField('stationid', StringType(), True),
|
||||
StructField('year', IntegerType(), True),
|
||||
StructField('month', IntegerType(), True),
|
||||
StructField('day', IntegerType(), True),
|
||||
StructField('date', DateType(), True),
|
||||
StructField('element', StringType(), True),
|
||||
StructField('value', IntegerType(), True)
|
||||
])
|
||||
dlyframe = spark.createDataFrame(dlsplit, dlyfileschema)
|
||||
return dlyframe
|
||||
|
||||
|
||||
def import_data_rdd_parallel(scon, spark, path):
|
||||
"""
|
||||
Import the data files from ghcnd in parallel.
|
||||
|
||||
This is much faster on a cluster or a computer with many cores
|
||||
and enough main memory to hold all the raw data.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
"""
|
||||
rdd = scon.textFile(
|
||||
path+"/ghcnd_all/*.dly", minPartitions=5000)
|
||||
rddcoa = rdd.coalesce(5000)
|
||||
|
||||
rddsplit = rddcoa.flatMap(conv_data_line)
|
||||
print("Number of data records = " + str(rddsplit.count()))
|
||||
print("Number of partitions = " + str(rddsplit.getNumPartitions()))
|
||||
|
||||
dlyfileschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('networkcode', StringType(), True),
|
||||
StructField('stationid', StringType(), True),
|
||||
StructField('year', IntegerType(), True),
|
||||
StructField('month', IntegerType(), True),
|
||||
StructField('day', IntegerType(), True),
|
||||
StructField('date', DateType(), True),
|
||||
StructField('element', StringType(), True),
|
||||
StructField('value', IntegerType(), True)
|
||||
])
|
||||
dlyframe = spark.createDataFrame(rddsplit, dlyfileschema)
|
||||
|
||||
dlyframe.show(10)
|
||||
|
||||
dlyframe.write.mode('overwrite').parquet(
|
||||
GHCNDPATH + "ghcnddata.parquet")
|
||||
print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet"))
|
||||
|
||||
|
||||
def import_data_rdd_parallel_whole(scon, spark, path):
|
||||
"""
|
||||
Import the data files from ghcnd in parallel.
|
||||
|
||||
This is much faster on a cluster or a computer with many cores
|
||||
and enough main memory to hold all the raw data.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
"""
|
||||
rdd = scon.wholeTextFiles(
|
||||
path+"/ghcnd_all/*.dly", minPartitions=5000 )
|
||||
|
||||
rddvals = rdd.values()
|
||||
print("Number of files in GHCND = " + str(rddvals.count()))
|
||||
rddlen = rddvals.map(len)
|
||||
print("Number of characters in all files = " +
|
||||
str(rddlen.reduce(lambda x, y: x + y)))
|
||||
|
||||
rddlines = rddvals.flatMap(lambda x: x.split("\n"))
|
||||
print("Number of lines with data = " + str(rddlines.count()))
|
||||
|
||||
rddsplit = rddlines.flatMap(conv_data_line)
|
||||
print("Number of data records = " + str(rddsplit.count()))
|
||||
print("Number of partitions = " + str(rddsplit.getNumPartitions()))
|
||||
|
||||
dlyfileschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('networkcode', StringType(), True),
|
||||
StructField('stationid', StringType(), True),
|
||||
StructField('year', IntegerType(), True),
|
||||
StructField('month', IntegerType(), True),
|
||||
StructField('day', IntegerType(), True),
|
||||
StructField('date', DateType(), True),
|
||||
StructField('element', StringType(), True),
|
||||
StructField('value', IntegerType(), True)
|
||||
])
|
||||
dlyframe = spark.createDataFrame(rddsplit, dlyfileschema)
|
||||
|
||||
dlyframe.show(10)
|
||||
|
||||
dlyframe.write.mode('overwrite').parquet(
|
||||
GHCNDPATH + "ghcnddata.parquet")
|
||||
print(os.system("hdfs dfs -du -s /ghcnd/ghcnddata.parquet"))
|
||||
|
||||
"""
|
||||
Code for testing problems that resulted finally from empty lines
|
||||
to solve the problem the code
|
||||
if line == '':
|
||||
return []
|
||||
was added at the beginning of convDataLine to filter away empty lines:
|
||||
|
||||
noyear = rddsplit.filter(lambda x: not x[3].isnumeric())
|
||||
noyear.collect()
|
||||
|
||||
rddlines1 = rdd.flatMap(lambda x: [(x[0], y) for y in x[1].split("\n")])
|
||||
print(rddlines1.count())
|
||||
|
||||
rddsplit1 = rddlines1.flatMap(convDataLine1)
|
||||
print(rddsplit1.count())
|
||||
|
||||
noyear1 = rddsplit1.filter(lambda x: not x[1][3].isnumeric())
|
||||
noyear1.collect()
|
||||
"""
|
||||
|
||||
|
||||
def import_ghcnd_files_extern(scon, spark, path, stationlist, batchsize,
|
||||
numparts):
|
||||
"""
|
||||
Import multiple data files in one batch.
|
||||
|
||||
Import batchsize data files in one batch and append the data into
|
||||
the parquet file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
path : string
|
||||
Path of the data files.
|
||||
stationlist : list
|
||||
List of all stations to load.
|
||||
batchsize : int
|
||||
Number of files to load in one batch.
|
||||
numparts : int
|
||||
Number of partitions to write one batch.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
data = None
|
||||
count = 0
|
||||
allcount = 0
|
||||
batchcount = 0
|
||||
for station in stationlist:
|
||||
# filename = "file://" + path + "/" + station + ".dly"
|
||||
filename = path + station + ".dly"
|
||||
if os.path.isfile(filename):
|
||||
dly = read_dly_file(spark, scon, "file://" + filename)
|
||||
if data is not None:
|
||||
data = data.union(dly)
|
||||
print("Batch " + str(batchcount) +
|
||||
" Filenr " + str(count) + " Processing " + filename)
|
||||
else:
|
||||
tstart = time()
|
||||
data = dly
|
||||
count += 1
|
||||
if count >= batchsize:
|
||||
# data = data.sort('countrycode', 'stationid', 'date')
|
||||
data = data.coalesce(numparts)
|
||||
tcoalesce = time()
|
||||
data.write.mode('Append').parquet(
|
||||
GHCNDPATH + "ghcnddata.parquet")
|
||||
anzrec = data.count()
|
||||
twrite = time()
|
||||
print(
|
||||
"\n\nBatch " + str(batchcount) +
|
||||
" #recs " + str(anzrec) +
|
||||
" #files " + str(allcount) +
|
||||
" readtime " + str.format("{:f}", tcoalesce - tstart) +
|
||||
" writetime " + str.format("{:f}", twrite - tcoalesce) +
|
||||
" recs/sec " +
|
||||
str.format("{:f}", anzrec / (twrite - tstart)) + "\n\n")
|
||||
allcount += count
|
||||
count = 0
|
||||
batchcount += 1
|
||||
data = None
|
||||
else:
|
||||
print("importGhcndFilesExtern: " + station +
|
||||
", " + filename + " not found")
|
||||
if data is not None:
|
||||
data = data.coalesce(numparts)
|
||||
data.write.mode('Append').parquet(GHCNDPATH + "ghcnddata.parquet")
|
||||
|
||||
|
||||
def import_all_data(scon, spark, path):
|
||||
"""
|
||||
Import all data from GHCND.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
path : string
|
||||
Path of data files.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
stationlist = spark.sql(
|
||||
"SELECT stationid AS station \
|
||||
FROM ghcndstations \
|
||||
ORDER BY station")
|
||||
pds = stationlist.toPandas()
|
||||
import_ghcnd_files_extern(scon, spark, path + "ghcnd_all/",
|
||||
pds.station, 30, 1)
|
||||
|
||||
|
||||
def import_data_single_files(scon, spark, stationlist, parquetname, path):
|
||||
"""
|
||||
Import the data files one by one.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
stationlist : list
|
||||
List of all stations to import data.
|
||||
parquetname : string
|
||||
Name of the parquet file to write the data to.
|
||||
path : string
|
||||
Path where the data files reside.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
pds = stationlist.toPandas()
|
||||
cnt = 0
|
||||
for station in pds.station:
|
||||
filename = path + station + ".dly"
|
||||
if os.path.isfile(filename):
|
||||
start = time()
|
||||
dly = read_dly_file(spark, scon, "file://" + filename)
|
||||
numrec = dly.count()
|
||||
dly = dly.coalesce(1).sort('element', 'date')
|
||||
read = time()
|
||||
dly.write.mode('Append').parquet(GHCNDPATH
|
||||
+ parquetname + ".parquet")
|
||||
finish = time()
|
||||
print(str.format(
|
||||
"{:8d} ", cnt) + station +
|
||||
" #rec " + str.format("{:7d}", numrec) +
|
||||
" read " + str.format("{:f}", read - start) +
|
||||
" write " + str.format("{:f}", finish - read) +
|
||||
" write/sec " + str.format("importDataSingleFiles{:f} ",
|
||||
numrec/(finish - read))
|
||||
+ " " + filename)
|
||||
else:
|
||||
print("#### " + str(cnt) + " File " +
|
||||
filename + " does not exist ####")
|
||||
cnt += 1
|
||||
|
||||
|
||||
def check_files(spark):
|
||||
"""
|
||||
Check if some files for generated stationnames do not exist.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
stationlist = spark.sql(
|
||||
"SELECT CONCAT(countrycode, networkcode, stationid) AS station \
|
||||
FROM ghcndstations \
|
||||
ORDER BY station")
|
||||
pds = stationlist.toPandas()
|
||||
count = 1
|
||||
for station in pds.station:
|
||||
filename = "/nfs/home/steger/ghcnd/ghcnd_all/" + station + ".dly"
|
||||
if os.path.isfile(filename):
|
||||
# print(str(count) + " " + station)
|
||||
pass
|
||||
else:
|
||||
print(str(count) + " File does not exist: " + filename)
|
||||
count += 1
|
||||
|
||||
"""
|
||||
Read the inventory data into a dataframe,
|
||||
register it as temporary view and write it to parquet
|
||||
"""
|
||||
|
||||
|
||||
def import_ghcnd_inventory(scon, spark, path):
|
||||
"""
|
||||
Import inventory information from GHCND.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
path : string
|
||||
Path for inventory file.
|
||||
|
||||
Returns
|
||||
-------
|
||||
invframe : DataFrame
|
||||
Data Frame with inventory data.
|
||||
|
||||
"""
|
||||
invlines = scon.textFile(path + "ghcnd-inventory.txt")
|
||||
invsplitlines = invlines.map(
|
||||
lambda l:
|
||||
(l[0:2],
|
||||
l[2:3],
|
||||
l[0:11],
|
||||
float(l[12:20].strip()),
|
||||
float(l[21:30].strip()),
|
||||
l[31:35],
|
||||
int(l[36:40]),
|
||||
int(l[41:45])
|
||||
))
|
||||
invschema = StructType([
|
||||
StructField('countrycode', StringType(), True),
|
||||
StructField('networkcode', StringType(), True),
|
||||
StructField('stationid', StringType(), True),
|
||||
StructField('latitude', FloatType(), True),
|
||||
StructField('longitude', FloatType(), True),
|
||||
StructField('element', StringType(), True),
|
||||
StructField('firstyear', IntegerType(), True),
|
||||
StructField('lastyear', IntegerType(), True)
|
||||
])
|
||||
invframe = spark.createDataFrame(invsplitlines, invschema)
|
||||
invframe.createOrReplaceTempView("ghcndinventory")
|
||||
invframe.write.mode('overwrite').parquet(
|
||||
GHCNDPATH + "ghcndinventory.parquet")
|
||||
invframe.cache()
|
||||
print("Imported GhcndInventory")
|
||||
return invframe
|
||||
|
||||
|
||||
def import_ghcnd_all(scon, spark):
|
||||
"""
|
||||
Import all files from GHCND.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scon : SparkContext
|
||||
The context.
|
||||
spark : SparkSession
|
||||
The SQL session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
localfilepath = "file://" + GHCNDHOMEPATH
|
||||
import_ghcnd_countries(scon, spark, localfilepath)
|
||||
import_ghcnd_stations(scon, spark, localfilepath)
|
||||
import_ghcnd_inventory(scon, spark, localfilepath)
|
||||
# import_all_data(scon, spark, GHCNDHOMEPATH)
|
||||
import_data_rdd_parallel(scon, spark, localfilepath)
|
||||
|
||||
|
||||
def read_ghcnd_from_parquet(spark):
|
||||
"""
|
||||
Read all data from the parquet files into Dataframes.
|
||||
|
||||
Create temporary views from the parquet files.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
spark : SparkSession
|
||||
The SQL Session.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
dfcountries = spark.read.parquet(GHCNDPATH + "ghcndcountries")
|
||||
dfcountries.createOrReplaceTempView("ghcndcountries")
|
||||
dfcountries.cache()
|
||||
|
||||
dfstations = spark.read.parquet(GHCNDPATH + "ghcndstations")
|
||||
dfstations.createOrReplaceTempView("ghcndstations")
|
||||
dfstations.cache()
|
||||
|
||||
dfinventory = spark.read.parquet(GHCNDPATH + "ghcndinventory")
|
||||
dfinventory.createOrReplaceTempView("ghcndinventory")
|
||||
dfinventory.cache()
|
||||
|
||||
dfdata = spark.read.parquet(GHCNDPATH + "ghcnddata")
|
||||
dfdata.createOrReplaceTempView("ghcnddata")
|
||||
dfdata.cache()
|
||||
|
||||
|
||||
def delete_all_parquet_ghcnd():
|
||||
"""
|
||||
Delete all parquet files that were imported from GHCND.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
delete_from_hdfs(GHCNDPATH + "ghcndstations.parquet")
|
||||
delete_from_hdfs(GHCNDPATH + "ghcndcountries.parquet")
|
||||
delete_from_hdfs(GHCNDPATH + "ghcndinventory.parquet")
|
||||
delete_from_hdfs(GHCNDPATH + "ghcnddata.parquet")
|
||||
|
||||
|
||||
def delete_from_hdfs(path):
|
||||
"""
|
||||
Delete the file in path from HDFS.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path : string
|
||||
Path of the file in HDFS.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
call("hdfs dfs -rm -R " + path,
|
||||
shell=True)
|
||||
220
Aufgabe 8/main.py
Normal file
220
Aufgabe 8/main.py
Normal file
@@ -0,0 +1,220 @@
|
||||
from sparkstart import scon, spark
|
||||
import ghcnd_stations
|
||||
import matplotlib.pyplot as plt
|
||||
import time
|
||||
|
||||
# a) Liste aller Stationen sortiert nach Stationsname
|
||||
def get_all_stations():
|
||||
start = time.time()
|
||||
result = spark.sql("SELECT * FROM stations ORDER BY name")
|
||||
result.show()
|
||||
end = time.time()
|
||||
print(f"Zeit: {end - start}")
|
||||
# Zweite Ausführung
|
||||
start = time.time()
|
||||
result = spark.sql("SELECT * FROM stations ORDER BY name")
|
||||
result.show()
|
||||
end = time.time()
|
||||
print(f"Zeit zweite Ausführung: {end - start}")
|
||||
|
||||
# b) Anzahl der Stationen je Land
|
||||
def get_station_count_per_country():
|
||||
start = time.time()
|
||||
result = spark.sql("""
|
||||
SELECT c.country_code, c.name, COUNT(s.id) as count
|
||||
FROM stations s
|
||||
JOIN ghcndcountries c ON s.country = c.country_code
|
||||
GROUP BY c.country_code, c.name
|
||||
ORDER BY count DESC
|
||||
""")
|
||||
result.show(truncate=False)
|
||||
end = time.time()
|
||||
print(f"Zeit: {end - start}")
|
||||
# Zweite
|
||||
start = time.time()
|
||||
result = spark.sql("""
|
||||
SELECT c.country_code, c.name, COUNT(s.id) as count
|
||||
FROM stations s
|
||||
JOIN ghcndcountries c ON s.country = c.country_code
|
||||
GROUP BY c.country_code, c.name
|
||||
ORDER BY count DESC
|
||||
""")
|
||||
result.show(truncate=False)
|
||||
end = time.time()
|
||||
print(f"Zeit zweite: {end - start}")
|
||||
|
||||
# c) Stationen in Deutschland
|
||||
def get_german_stations():
|
||||
start = time.time()
|
||||
result = spark.sql("SELECT * FROM stations WHERE country = 'GM' ORDER BY name")
|
||||
result.show()
|
||||
end = time.time()
|
||||
print(f"Zeit: {end - start}")
|
||||
# Zweite
|
||||
start = time.time()
|
||||
result = spark.sql("SELECT * FROM stations WHERE country = 'GM' ORDER BY name")
|
||||
result.show()
|
||||
end = time.time()
|
||||
print(f"Zeit zweite: {end - start}")
|
||||
|
||||
# d) Plot TMAX und TMIN für Station und Jahr
|
||||
def plot_temp_day(station_name, year):
|
||||
# Station ID finden
|
||||
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
|
||||
# Daten filtern
|
||||
df_filtered = spark.sql(f"""
|
||||
SELECT date, TMAX, TMIN FROM ghcnd_data
|
||||
WHERE station = '{station_id}' AND year(date) = {year}
|
||||
ORDER BY date
|
||||
""").toPandas()
|
||||
# Temperaturen in Grad umrechnen
|
||||
df_filtered['TMAX'] /= 10
|
||||
df_filtered['TMIN'] /= 10
|
||||
# Tage des Jahres
|
||||
df_filtered['day_of_year'] = df_filtered['date'].dt.dayofyear
|
||||
plt.plot(df_filtered['day_of_year'], df_filtered['TMAX'], 'r', label='TMAX')
|
||||
plt.plot(df_filtered['day_of_year'], df_filtered['TMIN'], 'b', label='TMIN')
|
||||
plt.xlabel('Tag des Jahres')
|
||||
plt.ylabel('Temperatur (°C)')
|
||||
plt.title(f'{station_name} {year}')
|
||||
plt.legend()
|
||||
plt.show()
|
||||
|
||||
# e) Gesamt-Niederschlag pro Jahr für Station
|
||||
def plot_precip_year(station_name):
|
||||
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
|
||||
df_precip = spark.sql(f"""
|
||||
SELECT year(date) as year, SUM(PRCP)/10 as total_precip
|
||||
FROM ghcnd_data
|
||||
WHERE station = '{station_id}'
|
||||
GROUP BY year(date)
|
||||
ORDER BY year
|
||||
""").toPandas()
|
||||
plt.bar(df_precip['year'], df_precip['total_precip'])
|
||||
plt.xlabel('Jahr')
|
||||
plt.ylabel('Niederschlag (mm)')
|
||||
plt.title(f'Gesamt-Niederschlag {station_name}')
|
||||
plt.show()
|
||||
|
||||
# f) Durchschnitt TMAX pro Tag des Jahres, mit 21-Tage Durchschnitt
|
||||
def plot_avg_tmax_day(station_name):
|
||||
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
|
||||
df_avg = spark.sql(f"""
|
||||
SELECT dayofyear(date) as day, AVG(TMAX)/10 as avg_tmax
|
||||
FROM ghcnd_data
|
||||
WHERE station = '{station_id}'
|
||||
GROUP BY dayofyear(date)
|
||||
ORDER BY day
|
||||
""").toPandas()
|
||||
# 21-Tage Durchschnitt
|
||||
df_avg['rolling_avg'] = df_avg['avg_tmax'].rolling(21, center=True).mean()
|
||||
plt.plot(df_avg['day'], df_avg['avg_tmax'], label='Täglich')
|
||||
plt.plot(df_avg['day'], df_avg['rolling_avg'], label='21-Tage')
|
||||
plt.xlabel('Tag des Jahres')
|
||||
plt.ylabel('Durchschnitt TMAX (°C)')
|
||||
plt.title(f'Durchschnitt TMAX {station_name}')
|
||||
plt.legend()
|
||||
plt.show()
|
||||
|
||||
# g) Durchschnitt TMAX und TMIN pro Jahr für Station
|
||||
def plot_temp_year(station_name):
|
||||
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
|
||||
df_temp = spark.sql(f"""
|
||||
SELECT year(date) as year, AVG(TMAX)/10 as avg_tmax, AVG(TMIN)/10 as avg_tmin
|
||||
FROM ghcnd_data
|
||||
WHERE station = '{station_id}'
|
||||
GROUP BY year(date)
|
||||
ORDER BY year
|
||||
""").toPandas()
|
||||
plt.plot(df_temp['year'], df_temp['avg_tmax'], 'r', label='TMAX')
|
||||
plt.plot(df_temp['year'], df_temp['avg_tmin'], 'b', label='TMIN')
|
||||
plt.xlabel('Jahr')
|
||||
plt.ylabel('Temperatur (°C)')
|
||||
plt.title(f'Temperatur {station_name}')
|
||||
plt.legend()
|
||||
plt.show()
|
||||
|
||||
# h) Durchschnitt TMAX pro Jahr und 20-Jahre Durchschnitt
|
||||
def plot_tmax_trend(station_name):
|
||||
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
|
||||
df_trend = spark.sql(f"""
|
||||
SELECT year(date) as year, AVG(TMAX)/10 as avg_tmax
|
||||
FROM ghcnd_data
|
||||
WHERE station = '{station_id}'
|
||||
GROUP BY year(date)
|
||||
ORDER BY year
|
||||
""").toPandas()
|
||||
# 20-Jahre Durchschnitt
|
||||
df_trend['rolling_avg'] = df_trend['avg_tmax'].rolling(20, center=True).mean()
|
||||
plt.plot(df_trend['year'], df_trend['avg_tmax'], label='Jährlich')
|
||||
plt.plot(df_trend['year'], df_trend['rolling_avg'], label='20-Jahre')
|
||||
plt.xlabel('Jahr')
|
||||
plt.ylabel('Durchschnitt TMAX (°C)')
|
||||
plt.title(f'TMAX Trend {station_name}')
|
||||
plt.legend()
|
||||
plt.show()
|
||||
|
||||
# i) Korrelation TMIN und TMAX pro Jahr
|
||||
def plot_corr_temp(station_name):
|
||||
station_id = spark.sql(f"SELECT id FROM stations WHERE name = '{station_name}'").collect()[0][0]
|
||||
df_corr = spark.sql(f"""
|
||||
SELECT year(date) as year, corr(TMIN, TMAX) as correlation
|
||||
FROM (
|
||||
SELECT date, TMIN, TMAX
|
||||
FROM ghcnd_data
|
||||
WHERE station = '{station_id}' AND TMIN IS NOT NULL AND TMAX IS NOT NULL
|
||||
)
|
||||
GROUP BY year(date)
|
||||
ORDER BY year
|
||||
""").toPandas()
|
||||
plt.plot(df_corr['year'], df_corr['correlation'])
|
||||
plt.xlabel('Jahr')
|
||||
plt.ylabel('Korrelation TMIN-TMAX')
|
||||
plt.title(f'Korrelation {station_name}')
|
||||
plt.show()
|
||||
|
||||
def main(scon, spark):
|
||||
# Daten laden
|
||||
ghcnd_stations.read_ghcnd_from_parquet(spark)
|
||||
|
||||
# a) Liste aller Stationen
|
||||
get_all_stations()
|
||||
|
||||
# b) Anzahl Stationen je Land
|
||||
get_station_count_per_country()
|
||||
|
||||
# c) Stationen in Deutschland
|
||||
get_german_stations()
|
||||
|
||||
# d) Plot für Kempten, Hohenpeissenberg, Zugspitze
|
||||
plot_temp_day('KEMPTEN', 2020)
|
||||
plot_temp_day('HOHENPEISSENBERG', 2020)
|
||||
plot_temp_day('ZUGSPITZE', 2020)
|
||||
|
||||
# e) Niederschlag
|
||||
plot_precip_year('KEMPTEN')
|
||||
plot_precip_year('HOHENPEISSENBERG')
|
||||
plot_precip_year('ZUGSPITZE')
|
||||
|
||||
# f) Durchschnitt TMAX
|
||||
plot_avg_tmax_day('KEMPTEN')
|
||||
plot_avg_tmax_day('HOHENPEISSENBERG')
|
||||
plot_avg_tmax_day('ZUGSPITZE')
|
||||
|
||||
# g) Temperatur pro Jahr
|
||||
plot_temp_year('KEMPTEN')
|
||||
plot_temp_year('HOHENPEISSENBERG')
|
||||
plot_temp_year('ZUGSPITZE')
|
||||
|
||||
# h) TMAX Trend
|
||||
plot_tmax_trend('KEMPTEN')
|
||||
plot_tmax_trend('HOHENPEISSENBERG')
|
||||
plot_tmax_trend('ZUGSPITZE')
|
||||
|
||||
# i) Korrelation
|
||||
plot_corr_temp('KEMPTEN')
|
||||
plot_corr_temp('HOHENPEISSENBERG')
|
||||
plot_corr_temp('ZUGSPITZE')
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(scon, spark)
|
||||
22
Aufgabe 8/sparkstart.py
Normal file
22
Aufgabe 8/sparkstart.py
Normal file
@@ -0,0 +1,22 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Erzeugen einer Spark-Konfiguration
|
||||
"""
|
||||
|
||||
from pyspark import SparkConf, SparkContext
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
# connect to cluster
|
||||
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
conf.set("spark.executor.memory", '32g')
|
||||
conf.set("spark.driver.memory", '8g')
|
||||
conf.set("spark.cores.max", "40")
|
||||
scon = SparkContext(conf=conf)
|
||||
|
||||
|
||||
spark = SparkSession \
|
||||
.builder \
|
||||
.appName("Python Spark SQL") \
|
||||
.getOrCreate()
|
||||
Reference in New Issue
Block a user