mirror of
https://github.com/Vale54321/BigData.git
synced 2025-12-11 09:59:33 +01:00
Aufgabe 5
This commit is contained in:
145
Aufgabe 5/main.py
Normal file
145
Aufgabe 5/main.py
Normal file
@@ -0,0 +1,145 @@
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
from typing import Iterable, List, Optional, Tuple
|
||||
|
||||
from sparkstart import scon
|
||||
import pandas as pd
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
def _safe_split(line: str) -> list:
|
||||
return line.rstrip("\n").split(",")
|
||||
|
||||
|
||||
def count_unique_aircraft(
|
||||
sc,
|
||||
filepath: str,
|
||||
granularity: str = "hour",
|
||||
partitions: Optional[int] = None,
|
||||
) -> Tuple[List[Tuple[datetime, int]], str]:
|
||||
assert granularity in {"hour", "day"}, "granularity must be 'hour' or 'day'"
|
||||
|
||||
lines = sc.textFile(filepath)
|
||||
|
||||
parts = lines.map(_safe_split)
|
||||
if partitions:
|
||||
parts = parts.coalesce(partitions)
|
||||
|
||||
def valid_row(cols: list) -> bool:
|
||||
try:
|
||||
if len(cols) < 8:
|
||||
return False
|
||||
msg_type = cols[1].strip()
|
||||
return msg_type in {"2", "3"}
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
filtered = parts.filter(valid_row)
|
||||
|
||||
def to_slot_aircraft(cols: list) -> Optional[Tuple[str, str]]:
|
||||
try:
|
||||
aircraft = cols[4].strip()
|
||||
date_s = cols[6].strip()
|
||||
time_s = cols[7].strip()
|
||||
if granularity == "day":
|
||||
slot = date_s
|
||||
else: # hour
|
||||
hh = time_s.split(":")[0] if ":" in time_s else time_s[:2]
|
||||
if not hh:
|
||||
return None
|
||||
slot = f"{date_s} {hh}:00"
|
||||
if not aircraft:
|
||||
return None
|
||||
return (slot, aircraft)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
slot_aircraft = filtered.map(to_slot_aircraft).filter(lambda x: x is not None)
|
||||
|
||||
unique_pairs = slot_aircraft.distinct()
|
||||
|
||||
counts_by_slot = unique_pairs.map(lambda sa: (sa[0], 1)).reduceByKey(lambda a, b: a + b)
|
||||
|
||||
sorted_counts = counts_by_slot.sortByKey(ascending=True)
|
||||
|
||||
def to_dt_count(slot_count: Tuple[str, int]) -> Optional[Tuple[datetime, int]]:
|
||||
slot, cnt = slot_count
|
||||
try:
|
||||
if granularity == "day":
|
||||
dt = datetime.strptime(slot, "%Y/%m/%d")
|
||||
else:
|
||||
dt = datetime.strptime(slot, "%Y/%m/%d %H:%M")
|
||||
return (dt, int(cnt))
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
dt_counts = sorted_counts.map(to_dt_count).filter(lambda x: x is not None).collect()
|
||||
|
||||
if dt_counts:
|
||||
dt_counts.sort(key=lambda x: x[0])
|
||||
out_dir = os.path.dirname(__file__)
|
||||
df = pd.DataFrame({
|
||||
"datetime": [d for d, _ in dt_counts],
|
||||
"count": [c for _, c in dt_counts],
|
||||
})
|
||||
df = df.sort_values("datetime")
|
||||
plt.figure(figsize=(10, 4))
|
||||
plt.plot(df["datetime"], df["count"], marker="o", linestyle="-", color="tab:blue")
|
||||
plt.title(f"Distinct aircraft per {granularity}")
|
||||
plt.xlabel("Time")
|
||||
plt.ylabel("# Aircraft")
|
||||
plt.grid(True, alpha=0.3)
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
return dt_counts
|
||||
|
||||
|
||||
def benchmark_partitions(
|
||||
sc,
|
||||
filepath: str,
|
||||
granularity: str = "hour",
|
||||
partition_list: Iterable[int] = (5, 50, 500),
|
||||
) -> List[Tuple[int, float]]:
|
||||
results: List[Tuple[int, float]] = []
|
||||
for p in partition_list:
|
||||
t0 = time.perf_counter()
|
||||
_ = count_unique_aircraft(sc, filepath, granularity=granularity, partitions=p)
|
||||
dt = time.perf_counter() - t0
|
||||
print(f"Partitionen={p}: Laufzeit {dt:.2f} s")
|
||||
results.append((p, dt))
|
||||
|
||||
# Save quick markdown table
|
||||
out_dir = os.path.dirname(__file__)
|
||||
md_path = os.path.join(out_dir, f"bench_partitions_{granularity}.md")
|
||||
try:
|
||||
with open(md_path, "w", encoding="utf-8") as f:
|
||||
f.write("| Partitions | Wall time (s) |\n|---:|---:|\n")
|
||||
for p, dt in results:
|
||||
f.write(f"| {p} | {dt:.2f} |\n")
|
||||
print(f"Benchmark-Tabelle gespeichert: {md_path}")
|
||||
except Exception as e:
|
||||
print(f"Fehler beim Speichern der Benchmark-Tabelle: {e}")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
results = count_unique_aircraft(
|
||||
scon,
|
||||
"/data/adsb/adsb20221203.txt",
|
||||
granularity="day",
|
||||
partitions=1
|
||||
)
|
||||
|
||||
print(f"Anzahl Zeit-Slots im Ergebnis: {len(results)}")
|
||||
if results:
|
||||
print("Erste 5 Ergebnisse:")
|
||||
for d, c in results[:5]:
|
||||
print(d.isoformat(), c)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
22
Aufgabe 5/sparkstart.py
Normal file
22
Aufgabe 5/sparkstart.py
Normal file
@@ -0,0 +1,22 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Erzeugen einer Spark-Konfiguration
|
||||
"""
|
||||
|
||||
from pyspark import SparkConf, SparkContext
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
# connect to cluster
|
||||
conf = SparkConf().setMaster("spark://193.174.205.250:7077").setAppName("HeisererValentin")
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
conf.set("spark.executor.memory", '32g')
|
||||
conf.set("spark.driver.memory", '8g')
|
||||
conf.set("spark.cores.max", "40")
|
||||
scon = SparkContext(conf=conf)
|
||||
|
||||
|
||||
spark = SparkSession \
|
||||
.builder \
|
||||
.appName("Python Spark SQL") \
|
||||
.getOrCreate()
|
||||
Reference in New Issue
Block a user