mirror of
https://github.com/Vale54321/BigData.git
synced 2025-12-11 09:59:33 +01:00
146 lines
4.3 KiB
Python
146 lines
4.3 KiB
Python
import os
|
|
import re
|
|
import time
|
|
import argparse
|
|
from datetime import datetime
|
|
from typing import Iterable, List, Optional, Tuple
|
|
|
|
from sparkstart import scon
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
|
|
def _safe_split(line: str) -> list:
|
|
return line.rstrip("\n").split(",")
|
|
|
|
|
|
def count_unique_aircraft(
|
|
sc,
|
|
filepath: str,
|
|
granularity: str = "hour",
|
|
partitions: Optional[int] = None,
|
|
) -> Tuple[List[Tuple[datetime, int]], str]:
|
|
assert granularity in {"hour", "day"}, "granularity must be 'hour' or 'day'"
|
|
|
|
lines = sc.textFile(filepath)
|
|
|
|
parts = lines.map(_safe_split)
|
|
if partitions:
|
|
parts = parts.coalesce(partitions)
|
|
|
|
def valid_row(cols: list) -> bool:
|
|
try:
|
|
if len(cols) < 8:
|
|
return False
|
|
msg_type = cols[1].strip()
|
|
return msg_type in {"2", "3"}
|
|
except Exception:
|
|
return False
|
|
|
|
filtered = parts.filter(valid_row)
|
|
|
|
def to_slot_aircraft(cols: list) -> Optional[Tuple[str, str]]:
|
|
try:
|
|
aircraft = cols[4].strip()
|
|
date_s = cols[6].strip()
|
|
time_s = cols[7].strip()
|
|
if granularity == "day":
|
|
slot = date_s
|
|
else: # hour
|
|
hh = time_s.split(":")[0] if ":" in time_s else time_s[:2]
|
|
if not hh:
|
|
return None
|
|
slot = f"{date_s} {hh}:00"
|
|
if not aircraft:
|
|
return None
|
|
return (slot, aircraft)
|
|
except Exception:
|
|
return None
|
|
|
|
slot_aircraft = filtered.map(to_slot_aircraft).filter(lambda x: x is not None)
|
|
|
|
unique_pairs = slot_aircraft.distinct()
|
|
|
|
counts_by_slot = unique_pairs.map(lambda sa: (sa[0], 1)).reduceByKey(lambda a, b: a + b)
|
|
|
|
sorted_counts = counts_by_slot.sortByKey(ascending=True)
|
|
|
|
def to_dt_count(slot_count: Tuple[str, int]) -> Optional[Tuple[datetime, int]]:
|
|
slot, cnt = slot_count
|
|
try:
|
|
if granularity == "day":
|
|
dt = datetime.strptime(slot, "%Y/%m/%d")
|
|
else:
|
|
dt = datetime.strptime(slot, "%Y/%m/%d %H:%M")
|
|
return (dt, int(cnt))
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
dt_counts = sorted_counts.map(to_dt_count).filter(lambda x: x is not None).collect()
|
|
|
|
if dt_counts:
|
|
dt_counts.sort(key=lambda x: x[0])
|
|
out_dir = os.path.dirname(__file__)
|
|
df = pd.DataFrame({
|
|
"datetime": [d for d, _ in dt_counts],
|
|
"count": [c for _, c in dt_counts],
|
|
})
|
|
df = df.sort_values("datetime")
|
|
plt.figure(figsize=(10, 4))
|
|
plt.plot(df["datetime"], df["count"], marker="o", linestyle="-", color="tab:blue")
|
|
plt.title(f"Distinct aircraft per {granularity}")
|
|
plt.xlabel("Time")
|
|
plt.ylabel("# Aircraft")
|
|
plt.grid(True, alpha=0.3)
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
return dt_counts
|
|
|
|
|
|
def benchmark_partitions(
|
|
sc,
|
|
filepath: str,
|
|
granularity: str = "hour",
|
|
partition_list: Iterable[int] = (5, 50, 500),
|
|
) -> List[Tuple[int, float]]:
|
|
results: List[Tuple[int, float]] = []
|
|
for p in partition_list:
|
|
t0 = time.perf_counter()
|
|
_ = count_unique_aircraft(sc, filepath, granularity=granularity, partitions=p)
|
|
dt = time.perf_counter() - t0
|
|
print(f"Partitionen={p}: Laufzeit {dt:.2f} s")
|
|
results.append((p, dt))
|
|
|
|
# Save quick markdown table
|
|
out_dir = os.path.dirname(__file__)
|
|
md_path = os.path.join(out_dir, f"bench_partitions_{granularity}.md")
|
|
try:
|
|
with open(md_path, "w", encoding="utf-8") as f:
|
|
f.write("| Partitions | Wall time (s) |\n|---:|---:|\n")
|
|
for p, dt in results:
|
|
f.write(f"| {p} | {dt:.2f} |\n")
|
|
print(f"Benchmark-Tabelle gespeichert: {md_path}")
|
|
except Exception as e:
|
|
print(f"Fehler beim Speichern der Benchmark-Tabelle: {e}")
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
results = count_unique_aircraft(
|
|
scon,
|
|
"/data/adsb/adsb20221203.txt",
|
|
granularity="day",
|
|
partitions=1
|
|
)
|
|
|
|
print(f"Anzahl Zeit-Slots im Ergebnis: {len(results)}")
|
|
if results:
|
|
print("Erste 5 Ergebnisse:")
|
|
for d, c in results[:5]:
|
|
print(d.isoformat(), c)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|