Files
2026-03-10 12:09:48 +01:00

326 lines
11 KiB
Python

import asyncio
import json
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from pathlib import Path
from dotenv import load_dotenv
# Load environment variables before anything else
load_dotenv()
from services.opensky import fetch_planes
from services.satellites import get_satellite_positions
from services.news import fetch_news
from services.ai import analyze_threat
from services.webcams import fetch_webcams
from services.bgp import fetch_bgp_status
from services.ais import fetch_ships, ais_worker, ais_pruner
from services.conflicts import fetch_conflicts
from services.cyber import fetch_cyber_warfare
HISTORY_DIR = Path(__file__).parent / "data_history"
HISTORY_DIR.mkdir(exist_ok=True)
MAX_SNAPSHOTS = 120 # keep last ~2 hours at 1min intervals
clients: set = set()
current_data = {
"planes": [],
"satellites": [],
"news": [],
"gps_interference": [],
"emergency_squawks": [],
"ai_analysis": {
"summary": "System initializing...",
"threat_level": "UNKNOWN",
"predictions": []
},
"webcams": [],
"bgp": {},
"ships": [],
"conflicts": [],
"cyber_attacks": [],
"last_updated": {}
}
# Broadcast cache — only rebuilt when data changes
_full_payload_cache: str | None = None
_full_payload_version: int = 0
_data_version: dict[str, int] = {}
def _ts():
return datetime.now(timezone.utc).isoformat()
def _mark_changed(key: str):
global _full_payload_cache
_data_version[key] = _data_version.get(key, 0) + 1
_full_payload_cache = None
def _get_broadcast_payload() -> str:
global _full_payload_cache
# No aggressive version caching to ensure immediate movement
return json.dumps({"type": "update", **current_data}, separators=(',', ':'))
async def update_maritime():
while True:
try:
current_data["ships"] = await fetch_ships()
current_data["last_updated"]["ships"] = _ts()
_mark_changed("ships")
except Exception as e:
print(f"[MARITIME] Error: {e}")
await asyncio.sleep(30)
async def update_planes_and_gps():
while True:
try:
result = await fetch_planes()
if isinstance(result, dict) and result.get("planes"):
current_data["planes"] = result.get("planes", [])
current_data["gps_interference"] = result.get("interference", [])
current_data["emergency_squawks"] = result.get("emergencies", [])
current_data["last_updated"]["planes"] = _ts()
_mark_changed("planes")
except Exception as e:
print(f"[AIRSPACE] Error: {e}")
await asyncio.sleep(20)
async def update_satellites():
while True:
try:
# Satellites positions are computed from TLEs.
# The TLE fetch itself should be cached in satellites.py.
current_data["satellites"] = await get_satellite_positions()
current_data["last_updated"]["satellites"] = _ts()
_mark_changed("satellites")
except Exception as e:
print(f"[SPACE] Error: {e}")
await asyncio.sleep(60) # Position computation update interval
async def update_cyber():
while True:
try:
bgp = current_data.get("bgp")
cyber_data = await fetch_cyber_warfare(bgp)
if cyber_data: # Only update if we got real data
current_data["cyber_attacks"] = cyber_data
current_data["last_updated"]["cyber_attacks"] = _ts()
_mark_changed("cyber_attacks")
except Exception as e:
print(f"[CYBER] Error: {e}")
await asyncio.sleep(300) # ThreatFox limits (5 mins)
async def update_news_and_ai():
while True:
try:
news = await fetch_news()
bgp = await fetch_bgp_status()
if news: current_data["news"] = news
if bgp: current_data["bgp"] = bgp
current_data["last_updated"]["news"] = _ts()
_mark_changed("news")
if current_data["news"] or current_data["conflicts"]:
analysis = await analyze_threat(
current_data["news"],
current_data["gps_interference"],
current_data["bgp"],
conflicts=current_data["conflicts"],
ships=current_data["ships"],
planes=current_data["planes"],
)
current_data["ai_analysis"] = analysis
_mark_changed("ai")
except Exception as e:
print(f"[INTEL] Error: {e}")
await asyncio.sleep(300) # RSS and AI limits (5 mins)
async def update_conflicts():
while True:
try:
conflicts = await fetch_conflicts()
if conflicts:
current_data["conflicts"] = conflicts
current_data["last_updated"]["conflicts"] = _ts()
_mark_changed("conflicts")
except Exception as e:
print(f"[CONFLICTS] Error: {e}")
await asyncio.sleep(600) # GDELT data (10 mins)
async def update_webcams():
while True:
try:
webcams = await fetch_webcams()
if webcams:
current_data["webcams"] = webcams
current_data["last_updated"]["webcams"] = _ts()
_mark_changed("webcams")
except Exception as e:
print(f"[WEBCAMS] Error: {e}")
await asyncio.sleep(3600) # Rarely changes
async def broadcast_updates():
last_version = -1
while True:
if clients and _full_payload_version != last_version:
payload = _get_broadcast_payload()
last_version = _full_payload_version
dead_clients: set = set()
for client in list(clients):
try:
await client.send_text(payload)
except Exception:
dead_clients.add(client)
clients.difference_update(dead_clients)
await asyncio.sleep(2)
def _save_snapshot():
"""Save a compact snapshot to disk for historical review."""
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
snapshot = {
"timestamp": _ts(),
"planes_count": len(current_data["planes"]),
"ships_count": len(current_data["ships"]),
"satellites_count": len(current_data["satellites"]),
"planes": current_data["planes"],
"ships": current_data["ships"],
"gps_interference": current_data["gps_interference"],
"emergency_squawks": current_data["emergency_squawks"],
"news": current_data["news"],
"conflicts": current_data["conflicts"],
"ai_analysis": current_data["ai_analysis"],
}
path = HISTORY_DIR / f"{ts}.json"
path.write_text(json.dumps(snapshot, separators=(',', ':')))
# Prune old snapshots
files = sorted(HISTORY_DIR.glob("*.json"))
for f in files[:-MAX_SNAPSHOTS]:
f.unlink(missing_ok=True)
async def save_snapshots():
"""Periodically save data snapshots for time-travel review."""
while True:
await asyncio.sleep(60)
try:
_save_snapshot()
except Exception as e:
print(f"[HISTORY] Snapshot error: {e}")
@asynccontextmanager
async def lifespan(app: FastAPI):
tasks = [
asyncio.create_task(ais_worker()),
asyncio.create_task(ais_pruner()),
asyncio.create_task(update_maritime()),
asyncio.create_task(update_planes_and_gps()),
asyncio.create_task(update_satellites()),
asyncio.create_task(update_cyber()),
asyncio.create_task(update_news_and_ai()),
asyncio.create_task(update_webcams()),
asyncio.create_task(update_conflicts()),
asyncio.create_task(broadcast_updates()),
asyncio.create_task(save_snapshots()),
]
# Save initial snapshot once data loads
print("[ARGUS NEXUS] All 9 data pipelines online.")
yield
for t in tasks:
t.cancel()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"status": "ONLINE",
"system": "ARGUS NEXUS",
"version": "2.1.0",
"active_clients": len(clients),
"data_counts": {
"planes": len(current_data["planes"]),
"satellites": len(current_data["satellites"]),
"ships": len(current_data["ships"]),
"news": len(current_data["news"]),
"webcams": len(current_data["webcams"]),
"gps_interference": len(current_data["gps_interference"]),
"emergencies": len(current_data["emergency_squawks"]),
"conflicts": len(current_data["conflicts"]),
},
"last_updated": current_data["last_updated"]
}
@app.get("/health")
async def health():
return {"status": "healthy", "timestamp": _ts()}
@app.get("/history")
async def list_history():
"""List available data snapshots for time-travel review."""
files = sorted(HISTORY_DIR.glob("*.json"), reverse=True)
snapshots = []
for f in files[:MAX_SNAPSHOTS]:
try:
meta = json.loads(f.read_text())
snapshots.append({
"id": f.stem,
"timestamp": meta.get("timestamp", ""),
"planes": meta.get("planes_count", 0),
"ships": meta.get("ships_count", 0),
"gps_zones": len(meta.get("gps_interference", [])),
"news": len(meta.get("news", [])),
"threat_level": meta.get("ai_analysis", {}).get("threat_level", ""),
})
except Exception:
continue
return {"snapshots": snapshots}
@app.get("/history/{snapshot_id}")
async def get_snapshot(snapshot_id: str):
"""Retrieve a specific historical data snapshot."""
path = HISTORY_DIR / f"{snapshot_id}.json"
if not path.exists():
return {"error": "Snapshot not found"}, 404
return json.loads(path.read_text())
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
clients.add(websocket)
print(f"[WS] Client connected. Total: {len(clients)}")
try:
await websocket.send_text(_get_broadcast_payload())
while True:
await websocket.receive_text()
except Exception:
pass
finally:
clients.discard(websocket)
print(f"[WS] Client disconnected. Total: {len(clients)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)