209 lines
7.3 KiB
Python
209 lines
7.3 KiB
Python
import httpx
|
||
import asyncio
|
||
|
||
ADSB_LOL_BASE = "https://api.adsb.lol/v2"
|
||
|
||
EMERGENCY_SQUAWKS = {
|
||
"7700": "GENERAL EMERGENCY",
|
||
"7600": "RADIO FAILURE",
|
||
"7500": "HIJACK / UNLAWFUL INTERFERENCE",
|
||
}
|
||
|
||
_HEADERS = {
|
||
"User-Agent": "Mozilla/5.0 (compatible; GodsEye/3.0)",
|
||
"Accept": "application/json",
|
||
}
|
||
|
||
# Regional civilian coverage queries: (lat, lon, radius_nm, label)
|
||
REGIONS = [
|
||
(45.0, -95.0, 2500, "N.America"),
|
||
(52.0, 8.0, 2000, "Europe"),
|
||
(25.0, 60.0, 2000, "MiddleEast"),
|
||
(35.0, 125.0, 2000, "E.Asia"),
|
||
( 5.0, 110.0, 2000, "SE.Asia"),
|
||
(-10.0, -40.0, 2500, "S.America+Africa"),
|
||
]
|
||
|
||
|
||
def _parse_ac(ac: dict, military: bool = False) -> dict | None:
|
||
lat = ac.get("lat")
|
||
lon = ac.get("lon")
|
||
if lat is None or lon is None:
|
||
return None
|
||
alt_baro = ac.get("alt_baro", 0)
|
||
if alt_baro == "ground":
|
||
alt_m = 0
|
||
else:
|
||
alt_m = round(float(alt_baro) * 0.3048, 0) if isinstance(alt_baro, (int, float)) else 0
|
||
squawk = str(ac.get("squawk", "")).strip()
|
||
is_mil = military or bool(ac.get("mil")) or bool((ac.get("dbFlags", 0) or 0) & 1)
|
||
return {
|
||
"id": ac.get("hex", "UNKNOWN"),
|
||
"callsign": str(ac.get("flight", "")).strip() or ac.get("hex", "UNKNOWN"),
|
||
"country": ac.get("ownOp", ac.get("native", "")),
|
||
"lon": round(lon, 4),
|
||
"lat": round(lat, 4),
|
||
"alt": alt_m,
|
||
"velocity": round(float(ac.get("gs", 0) or 0) * 0.5144, 1),
|
||
"heading": round(float(ac.get("track", 0) or 0), 1),
|
||
"military": is_mil,
|
||
"squawk": squawk,
|
||
"_emergency": squawk in EMERGENCY_SQUAWKS,
|
||
"type": "plane",
|
||
# GPS quality fields for jamming detection
|
||
"nac_p": ac.get("nac_p"), # Navigation Accuracy Category (0–11, ≥9=normal)
|
||
"nic": ac.get("nic"), # Navigation Integrity Category (0=no integrity)
|
||
"sil": ac.get("sil"), # Source Integrity Level (0–3)
|
||
}
|
||
|
||
|
||
async def _fetch_military(client: httpx.AsyncClient) -> list[dict]:
|
||
try:
|
||
r = await client.get(f"{ADSB_LOL_BASE}/mil", headers=_HEADERS, timeout=20.0)
|
||
if r.status_code != 200:
|
||
print(f"[AIRSPACE] /v2/mil returned {r.status_code}")
|
||
return []
|
||
ac_list = r.json().get("ac", [])
|
||
planes = []
|
||
for ac in ac_list:
|
||
p = _parse_ac(ac, military=True)
|
||
if p:
|
||
planes.append(p)
|
||
print(f"[AIRSPACE] Military: {len(planes)} aircraft")
|
||
return planes
|
||
except Exception as e:
|
||
print(f"[AIRSPACE] Military fetch error: {e}")
|
||
return []
|
||
|
||
|
||
async def _fetch_region(client: httpx.AsyncClient, lat: float, lon: float, radius: int, label: str) -> list[dict]:
|
||
try:
|
||
url = f"{ADSB_LOL_BASE}/point/{lat}/{lon}/{radius}"
|
||
r = await client.get(url, headers=_HEADERS, timeout=20.0)
|
||
if r.status_code != 200:
|
||
return []
|
||
ac_list = r.json().get("ac", [])
|
||
planes = []
|
||
for ac in ac_list:
|
||
p = _parse_ac(ac)
|
||
if p:
|
||
planes.append(p)
|
||
return planes
|
||
except Exception as e:
|
||
print(f"[AIRSPACE] Region {label} error: {e}")
|
||
return []
|
||
|
||
|
||
def _compute_gps_interference(planes: list[dict]) -> list[dict]:
|
||
"""
|
||
Real GPS jamming detection from ADS-B navigation accuracy fields.
|
||
Same method as gpsjam.org: cluster aircraft with degraded NAC_P / NIC values.
|
||
|
||
nac_p (Navigation Accuracy Category for Position):
|
||
≥ 9 = normal GPS (HPU < 30m)
|
||
7 = HPU < 0.1 NM (~185m)
|
||
≤ 4 = HPU > 0.3 NM (~555m) → GPS quality degraded, likely jamming
|
||
0 = HPU unknown / no fix
|
||
|
||
nic (Navigation Integrity Category):
|
||
0 = no integrity assurance → strong spoofing/jamming indicator
|
||
≥ 7 = normal
|
||
"""
|
||
# Grid cell size in degrees
|
||
CELL = 2.0
|
||
MIN_ANOMALOUS = 3 # need at least 3 aircraft showing anomalies per cell
|
||
|
||
cell_counts: dict[tuple, dict] = {}
|
||
|
||
for p in planes:
|
||
nac = p.get("nac_p")
|
||
nic = p.get("nic")
|
||
lat, lon = p.get("lat"), p.get("lon")
|
||
if lat is None or lon is None:
|
||
continue
|
||
|
||
# Detect GPS-degraded aircraft
|
||
gps_degraded = (
|
||
(nac is not None and nac <= 4) or
|
||
(nic is not None and nic == 0 and nac is not None and nac < 9)
|
||
)
|
||
if not gps_degraded:
|
||
continue
|
||
|
||
# Snap to grid cell
|
||
cell = (round(lat / CELL) * CELL, round(lon / CELL) * CELL)
|
||
if cell not in cell_counts:
|
||
cell_counts[cell] = {"count": 0, "lats": [], "lons": [], "min_nac": 99}
|
||
cell_counts[cell]["count"] += 1
|
||
cell_counts[cell]["lats"].append(lat)
|
||
cell_counts[cell]["lons"].append(lon)
|
||
if nac is not None:
|
||
cell_counts[cell]["min_nac"] = min(cell_counts[cell]["min_nac"], nac)
|
||
|
||
zones = []
|
||
for (clat, clon), info in cell_counts.items():
|
||
if info["count"] < MIN_ANOMALOUS:
|
||
continue
|
||
count = info["count"]
|
||
min_nac = info["min_nac"]
|
||
intensity = (
|
||
"CRITICAL" if count >= 10 or min_nac == 0 else
|
||
"HIGH" if count >= 5 or min_nac <= 2 else
|
||
"ACTIVE"
|
||
)
|
||
# Use centroid of affected aircraft for more accurate placement
|
||
center_lat = sum(info["lats"]) / len(info["lats"])
|
||
center_lon = sum(info["lons"]) / len(info["lons"])
|
||
zones.append({
|
||
"lat": round(center_lat, 2),
|
||
"lon": round(center_lon, 2),
|
||
"intensity": intensity,
|
||
"aircraft_count": count,
|
||
"min_nac_p": min_nac if min_nac < 99 else None,
|
||
"source": "ADS-B NAC/NIC",
|
||
})
|
||
|
||
if zones:
|
||
print(f"[GPS] {len(zones)} jamming zones detected from {sum(z['aircraft_count'] for z in zones)} anomalous aircraft")
|
||
return zones
|
||
|
||
|
||
async def fetch_planes() -> dict:
|
||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||
# Military first, then all regions in parallel
|
||
tasks = [_fetch_military(client)] + [
|
||
_fetch_region(client, lat, lon, r, label)
|
||
for lat, lon, r, label in REGIONS
|
||
]
|
||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
mil_planes = results[0] if isinstance(results[0], list) else []
|
||
mil_ids = {p["id"] for p in mil_planes}
|
||
|
||
# Merge regional civilian results, deduplicate
|
||
seen: set[str] = set(mil_ids)
|
||
civilian: list[dict] = []
|
||
for region_result in results[1:]:
|
||
if not isinstance(region_result, list):
|
||
continue
|
||
for p in region_result:
|
||
if p["id"] not in seen:
|
||
seen.add(p["id"])
|
||
civilian.append(p)
|
||
|
||
emergencies = [p for p in civilian if p.pop("_emergency", False)]
|
||
for p in mil_planes:
|
||
p.pop("_emergency", None)
|
||
regular = [p for p in civilian if not p.get("_emergency")]
|
||
|
||
final = mil_planes + emergencies + regular
|
||
interference = _compute_gps_interference(final)
|
||
print(f"[AIRSPACE] Synced {len(final)} aircraft ({len(mil_planes)} mil, {len(emergencies)} emergency) | {len(interference)} GPS zones")
|
||
return {"planes": final, "interference": interference, "emergencies": emergencies}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import json
|
||
result = asyncio.run(fetch_planes())
|
||
print(f"Total: {len(result['planes'])}, Military: {sum(1 for p in result['planes'] if p['military'])}")
|