import httpx import asyncio ADSB_LOL_BASE = "https://api.adsb.lol/v2" EMERGENCY_SQUAWKS = { "7700": "GENERAL EMERGENCY", "7600": "RADIO FAILURE", "7500": "HIJACK / UNLAWFUL INTERFERENCE", } _HEADERS = { "User-Agent": "Mozilla/5.0 (compatible; GodsEye/3.0)", "Accept": "application/json", } # Regional civilian coverage queries: (lat, lon, radius_nm, label) REGIONS = [ (45.0, -95.0, 2500, "N.America"), (52.0, 8.0, 2000, "Europe"), (25.0, 60.0, 2000, "MiddleEast"), (35.0, 125.0, 2000, "E.Asia"), ( 5.0, 110.0, 2000, "SE.Asia"), (-10.0, -40.0, 2500, "S.America+Africa"), ] def _parse_ac(ac: dict, military: bool = False) -> dict | None: lat = ac.get("lat") lon = ac.get("lon") if lat is None or lon is None: return None alt_baro = ac.get("alt_baro", 0) if alt_baro == "ground": alt_m = 0 else: alt_m = round(float(alt_baro) * 0.3048, 0) if isinstance(alt_baro, (int, float)) else 0 squawk = str(ac.get("squawk", "")).strip() is_mil = military or bool(ac.get("mil")) or bool((ac.get("dbFlags", 0) or 0) & 1) return { "id": ac.get("hex", "UNKNOWN"), "callsign": str(ac.get("flight", "")).strip() or ac.get("hex", "UNKNOWN"), "country": ac.get("ownOp", ac.get("native", "")), "lon": round(lon, 4), "lat": round(lat, 4), "alt": alt_m, "velocity": round(float(ac.get("gs", 0) or 0) * 0.5144, 1), "heading": round(float(ac.get("track", 0) or 0), 1), "military": is_mil, "squawk": squawk, "_emergency": squawk in EMERGENCY_SQUAWKS, "type": "plane", # GPS quality fields for jamming detection "nac_p": ac.get("nac_p"), # Navigation Accuracy Category (0–11, ≥9=normal) "nic": ac.get("nic"), # Navigation Integrity Category (0=no integrity) "sil": ac.get("sil"), # Source Integrity Level (0–3) } async def _fetch_military(client: httpx.AsyncClient) -> list[dict]: try: r = await client.get(f"{ADSB_LOL_BASE}/mil", headers=_HEADERS, timeout=20.0) if r.status_code != 200: print(f"[AIRSPACE] /v2/mil returned {r.status_code}") return [] ac_list = r.json().get("ac", []) planes = [] for ac in ac_list: p = _parse_ac(ac, military=True) if p: planes.append(p) print(f"[AIRSPACE] Military: {len(planes)} aircraft") return planes except Exception as e: print(f"[AIRSPACE] Military fetch error: {e}") return [] async def _fetch_region(client: httpx.AsyncClient, lat: float, lon: float, radius: int, label: str) -> list[dict]: try: url = f"{ADSB_LOL_BASE}/point/{lat}/{lon}/{radius}" r = await client.get(url, headers=_HEADERS, timeout=20.0) if r.status_code != 200: return [] ac_list = r.json().get("ac", []) planes = [] for ac in ac_list: p = _parse_ac(ac) if p: planes.append(p) return planes except Exception as e: print(f"[AIRSPACE] Region {label} error: {e}") return [] def _compute_gps_interference(planes: list[dict]) -> list[dict]: """ Real GPS jamming detection from ADS-B navigation accuracy fields. Same method as gpsjam.org: cluster aircraft with degraded NAC_P / NIC values. nac_p (Navigation Accuracy Category for Position): ≥ 9 = normal GPS (HPU < 30m) 7 = HPU < 0.1 NM (~185m) ≤ 4 = HPU > 0.3 NM (~555m) → GPS quality degraded, likely jamming 0 = HPU unknown / no fix nic (Navigation Integrity Category): 0 = no integrity assurance → strong spoofing/jamming indicator ≥ 7 = normal """ # Grid cell size in degrees CELL = 2.0 MIN_ANOMALOUS = 3 # need at least 3 aircraft showing anomalies per cell cell_counts: dict[tuple, dict] = {} for p in planes: nac = p.get("nac_p") nic = p.get("nic") lat, lon = p.get("lat"), p.get("lon") if lat is None or lon is None: continue # Detect GPS-degraded aircraft gps_degraded = ( (nac is not None and nac <= 4) or (nic is not None and nic == 0 and nac is not None and nac < 9) ) if not gps_degraded: continue # Snap to grid cell cell = (round(lat / CELL) * CELL, round(lon / CELL) * CELL) if cell not in cell_counts: cell_counts[cell] = {"count": 0, "lats": [], "lons": [], "min_nac": 99} cell_counts[cell]["count"] += 1 cell_counts[cell]["lats"].append(lat) cell_counts[cell]["lons"].append(lon) if nac is not None: cell_counts[cell]["min_nac"] = min(cell_counts[cell]["min_nac"], nac) zones = [] for (clat, clon), info in cell_counts.items(): if info["count"] < MIN_ANOMALOUS: continue count = info["count"] min_nac = info["min_nac"] intensity = ( "CRITICAL" if count >= 10 or min_nac == 0 else "HIGH" if count >= 5 or min_nac <= 2 else "ACTIVE" ) # Use centroid of affected aircraft for more accurate placement center_lat = sum(info["lats"]) / len(info["lats"]) center_lon = sum(info["lons"]) / len(info["lons"]) zones.append({ "lat": round(center_lat, 2), "lon": round(center_lon, 2), "intensity": intensity, "aircraft_count": count, "min_nac_p": min_nac if min_nac < 99 else None, "source": "ADS-B NAC/NIC", }) if zones: print(f"[GPS] {len(zones)} jamming zones detected from {sum(z['aircraft_count'] for z in zones)} anomalous aircraft") return zones async def fetch_planes() -> dict: async with httpx.AsyncClient(timeout=30.0) as client: # Military first, then all regions in parallel tasks = [_fetch_military(client)] + [ _fetch_region(client, lat, lon, r, label) for lat, lon, r, label in REGIONS ] results = await asyncio.gather(*tasks, return_exceptions=True) mil_planes = results[0] if isinstance(results[0], list) else [] mil_ids = {p["id"] for p in mil_planes} # Merge regional civilian results, deduplicate seen: set[str] = set(mil_ids) civilian: list[dict] = [] for region_result in results[1:]: if not isinstance(region_result, list): continue for p in region_result: if p["id"] not in seen: seen.add(p["id"]) civilian.append(p) emergencies = [p for p in civilian if p.pop("_emergency", False)] for p in mil_planes: p.pop("_emergency", None) regular = [p for p in civilian if not p.get("_emergency")] final = mil_planes + emergencies + regular interference = _compute_gps_interference(final) print(f"[AIRSPACE] Synced {len(final)} aircraft ({len(mil_planes)} mil, {len(emergencies)} emergency) | {len(interference)} GPS zones") return {"planes": final, "interference": interference, "emergencies": emergencies} if __name__ == "__main__": import json result = asyncio.run(fetch_planes()) print(f"Total: {len(result['planes'])}, Military: {sum(1 for p in result['planes'] if p['military'])}")