326 lines
11 KiB
Python
326 lines
11 KiB
Python
import asyncio
|
|
import json
|
|
from fastapi import FastAPI, WebSocket
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables before anything else
|
|
load_dotenv()
|
|
|
|
from services.opensky import fetch_planes
|
|
from services.satellites import get_satellite_positions
|
|
from services.news import fetch_news
|
|
from services.ai import analyze_threat
|
|
from services.webcams import fetch_webcams
|
|
from services.bgp import fetch_bgp_status
|
|
from services.ais import fetch_ships, ais_worker, ais_pruner
|
|
from services.conflicts import fetch_conflicts
|
|
from services.cyber import fetch_cyber_warfare
|
|
|
|
HISTORY_DIR = Path(__file__).parent / "data_history"
|
|
HISTORY_DIR.mkdir(exist_ok=True)
|
|
MAX_SNAPSHOTS = 120 # keep last ~2 hours at 1min intervals
|
|
|
|
clients: set = set()
|
|
current_data = {
|
|
"planes": [],
|
|
"satellites": [],
|
|
"news": [],
|
|
"gps_interference": [],
|
|
"emergency_squawks": [],
|
|
"ai_analysis": {
|
|
"summary": "System initializing...",
|
|
"threat_level": "UNKNOWN",
|
|
"predictions": []
|
|
},
|
|
"webcams": [],
|
|
"bgp": {},
|
|
"ships": [],
|
|
"conflicts": [],
|
|
"cyber_attacks": [],
|
|
"last_updated": {}
|
|
}
|
|
|
|
# Broadcast cache — only rebuilt when data changes
|
|
_full_payload_cache: str | None = None
|
|
_full_payload_version: int = 0
|
|
_data_version: dict[str, int] = {}
|
|
|
|
def _ts():
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
def _mark_changed(key: str):
|
|
global _full_payload_cache
|
|
_data_version[key] = _data_version.get(key, 0) + 1
|
|
_full_payload_cache = None
|
|
|
|
def _get_broadcast_payload() -> str:
|
|
global _full_payload_cache
|
|
# No aggressive version caching to ensure immediate movement
|
|
|
|
return json.dumps({"type": "update", **current_data}, separators=(',', ':'))
|
|
|
|
|
|
async def update_maritime():
|
|
while True:
|
|
try:
|
|
current_data["ships"] = await fetch_ships()
|
|
current_data["last_updated"]["ships"] = _ts()
|
|
_mark_changed("ships")
|
|
except Exception as e:
|
|
print(f"[MARITIME] Error: {e}")
|
|
await asyncio.sleep(30)
|
|
|
|
async def update_planes_and_gps():
|
|
while True:
|
|
try:
|
|
result = await fetch_planes()
|
|
if isinstance(result, dict) and result.get("planes"):
|
|
current_data["planes"] = result.get("planes", [])
|
|
current_data["gps_interference"] = result.get("interference", [])
|
|
current_data["emergency_squawks"] = result.get("emergencies", [])
|
|
current_data["last_updated"]["planes"] = _ts()
|
|
_mark_changed("planes")
|
|
except Exception as e:
|
|
print(f"[AIRSPACE] Error: {e}")
|
|
await asyncio.sleep(20)
|
|
|
|
async def update_satellites():
|
|
while True:
|
|
try:
|
|
# Satellites positions are computed from TLEs.
|
|
# The TLE fetch itself should be cached in satellites.py.
|
|
current_data["satellites"] = await get_satellite_positions()
|
|
current_data["last_updated"]["satellites"] = _ts()
|
|
_mark_changed("satellites")
|
|
except Exception as e:
|
|
print(f"[SPACE] Error: {e}")
|
|
await asyncio.sleep(60) # Position computation update interval
|
|
|
|
async def update_cyber():
|
|
while True:
|
|
try:
|
|
bgp = current_data.get("bgp")
|
|
cyber_data = await fetch_cyber_warfare(bgp)
|
|
if cyber_data: # Only update if we got real data
|
|
current_data["cyber_attacks"] = cyber_data
|
|
current_data["last_updated"]["cyber_attacks"] = _ts()
|
|
_mark_changed("cyber_attacks")
|
|
except Exception as e:
|
|
print(f"[CYBER] Error: {e}")
|
|
await asyncio.sleep(300) # ThreatFox limits (5 mins)
|
|
|
|
async def update_news_and_ai():
|
|
while True:
|
|
try:
|
|
news = await fetch_news()
|
|
bgp = await fetch_bgp_status()
|
|
if news: current_data["news"] = news
|
|
if bgp: current_data["bgp"] = bgp
|
|
current_data["last_updated"]["news"] = _ts()
|
|
_mark_changed("news")
|
|
|
|
if current_data["news"] or current_data["conflicts"]:
|
|
analysis = await analyze_threat(
|
|
current_data["news"],
|
|
current_data["gps_interference"],
|
|
current_data["bgp"],
|
|
conflicts=current_data["conflicts"],
|
|
ships=current_data["ships"],
|
|
planes=current_data["planes"],
|
|
)
|
|
current_data["ai_analysis"] = analysis
|
|
_mark_changed("ai")
|
|
except Exception as e:
|
|
print(f"[INTEL] Error: {e}")
|
|
await asyncio.sleep(300) # RSS and AI limits (5 mins)
|
|
|
|
|
|
async def update_conflicts():
|
|
while True:
|
|
try:
|
|
conflicts = await fetch_conflicts()
|
|
if conflicts:
|
|
current_data["conflicts"] = conflicts
|
|
current_data["last_updated"]["conflicts"] = _ts()
|
|
_mark_changed("conflicts")
|
|
except Exception as e:
|
|
print(f"[CONFLICTS] Error: {e}")
|
|
await asyncio.sleep(600) # GDELT data (10 mins)
|
|
|
|
|
|
async def update_webcams():
|
|
while True:
|
|
try:
|
|
webcams = await fetch_webcams()
|
|
if webcams:
|
|
current_data["webcams"] = webcams
|
|
current_data["last_updated"]["webcams"] = _ts()
|
|
_mark_changed("webcams")
|
|
except Exception as e:
|
|
print(f"[WEBCAMS] Error: {e}")
|
|
await asyncio.sleep(3600) # Rarely changes
|
|
|
|
|
|
async def broadcast_updates():
|
|
last_version = -1
|
|
while True:
|
|
if clients and _full_payload_version != last_version:
|
|
payload = _get_broadcast_payload()
|
|
last_version = _full_payload_version
|
|
dead_clients: set = set()
|
|
for client in list(clients):
|
|
try:
|
|
await client.send_text(payload)
|
|
except Exception:
|
|
dead_clients.add(client)
|
|
clients.difference_update(dead_clients)
|
|
await asyncio.sleep(2)
|
|
|
|
|
|
def _save_snapshot():
|
|
"""Save a compact snapshot to disk for historical review."""
|
|
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
snapshot = {
|
|
"timestamp": _ts(),
|
|
"planes_count": len(current_data["planes"]),
|
|
"ships_count": len(current_data["ships"]),
|
|
"satellites_count": len(current_data["satellites"]),
|
|
"planes": current_data["planes"],
|
|
"ships": current_data["ships"],
|
|
"gps_interference": current_data["gps_interference"],
|
|
"emergency_squawks": current_data["emergency_squawks"],
|
|
"news": current_data["news"],
|
|
"conflicts": current_data["conflicts"],
|
|
"ai_analysis": current_data["ai_analysis"],
|
|
}
|
|
path = HISTORY_DIR / f"{ts}.json"
|
|
path.write_text(json.dumps(snapshot, separators=(',', ':')))
|
|
# Prune old snapshots
|
|
files = sorted(HISTORY_DIR.glob("*.json"))
|
|
for f in files[:-MAX_SNAPSHOTS]:
|
|
f.unlink(missing_ok=True)
|
|
|
|
|
|
async def save_snapshots():
|
|
"""Periodically save data snapshots for time-travel review."""
|
|
while True:
|
|
await asyncio.sleep(60)
|
|
try:
|
|
_save_snapshot()
|
|
except Exception as e:
|
|
print(f"[HISTORY] Snapshot error: {e}")
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
tasks = [
|
|
asyncio.create_task(ais_worker()),
|
|
asyncio.create_task(ais_pruner()),
|
|
asyncio.create_task(update_maritime()),
|
|
asyncio.create_task(update_planes_and_gps()),
|
|
asyncio.create_task(update_satellites()),
|
|
asyncio.create_task(update_cyber()),
|
|
asyncio.create_task(update_news_and_ai()),
|
|
asyncio.create_task(update_webcams()),
|
|
asyncio.create_task(update_conflicts()),
|
|
asyncio.create_task(broadcast_updates()),
|
|
asyncio.create_task(save_snapshots()),
|
|
]
|
|
# Save initial snapshot once data loads
|
|
print("[ARGUS NEXUS] All 9 data pipelines online.")
|
|
yield
|
|
for t in tasks:
|
|
t.cancel()
|
|
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {
|
|
"status": "ONLINE",
|
|
"system": "ARGUS NEXUS",
|
|
"version": "2.1.0",
|
|
"active_clients": len(clients),
|
|
"data_counts": {
|
|
"planes": len(current_data["planes"]),
|
|
"satellites": len(current_data["satellites"]),
|
|
"ships": len(current_data["ships"]),
|
|
"news": len(current_data["news"]),
|
|
"webcams": len(current_data["webcams"]),
|
|
"gps_interference": len(current_data["gps_interference"]),
|
|
"emergencies": len(current_data["emergency_squawks"]),
|
|
"conflicts": len(current_data["conflicts"]),
|
|
},
|
|
"last_updated": current_data["last_updated"]
|
|
}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "healthy", "timestamp": _ts()}
|
|
|
|
|
|
@app.get("/history")
|
|
async def list_history():
|
|
"""List available data snapshots for time-travel review."""
|
|
files = sorted(HISTORY_DIR.glob("*.json"), reverse=True)
|
|
snapshots = []
|
|
for f in files[:MAX_SNAPSHOTS]:
|
|
try:
|
|
meta = json.loads(f.read_text())
|
|
snapshots.append({
|
|
"id": f.stem,
|
|
"timestamp": meta.get("timestamp", ""),
|
|
"planes": meta.get("planes_count", 0),
|
|
"ships": meta.get("ships_count", 0),
|
|
"gps_zones": len(meta.get("gps_interference", [])),
|
|
"news": len(meta.get("news", [])),
|
|
"threat_level": meta.get("ai_analysis", {}).get("threat_level", ""),
|
|
})
|
|
except Exception:
|
|
continue
|
|
return {"snapshots": snapshots}
|
|
|
|
|
|
@app.get("/history/{snapshot_id}")
|
|
async def get_snapshot(snapshot_id: str):
|
|
"""Retrieve a specific historical data snapshot."""
|
|
path = HISTORY_DIR / f"{snapshot_id}.json"
|
|
if not path.exists():
|
|
return {"error": "Snapshot not found"}, 404
|
|
return json.loads(path.read_text())
|
|
|
|
|
|
@app.websocket("/ws")
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
await websocket.accept()
|
|
clients.add(websocket)
|
|
print(f"[WS] Client connected. Total: {len(clients)}")
|
|
try:
|
|
await websocket.send_text(_get_broadcast_payload())
|
|
while True:
|
|
await websocket.receive_text()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
clients.discard(websocket)
|
|
print(f"[WS] Client disconnected. Total: {len(clients)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|