178 lines
6.8 KiB
Python
178 lines
6.8 KiB
Python
import asyncio
|
||
import gzip
|
||
import json
|
||
import websockets
|
||
import httpx
|
||
import os
|
||
from datetime import datetime, timezone
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
AIS_STREAM_URL = "wss://stream.aisstream.io/v1/stream"
|
||
API_KEY = os.getenv("AIS_STREAM_KEY", "185a6c61223ce81ed70cec58eab359009ebd5a0e")
|
||
|
||
_real_vessel_cache: dict = {}
|
||
_real_vessel_lock = asyncio.Lock()
|
||
|
||
def get_vessel_category(ship_type):
|
||
if not ship_type: return "Other"
|
||
if 70 <= ship_type <= 79: return "Cargo"
|
||
if 80 <= ship_type <= 89: return "Tanker"
|
||
if 60 <= ship_type <= 69: return "Passenger"
|
||
if ship_type == 35: return "Military"
|
||
if 30 <= ship_type <= 34: return "Fishing"
|
||
return "Other"
|
||
|
||
async def ais_worker():
|
||
if not API_KEY:
|
||
print("[MARITIME] Missing API Key.")
|
||
return
|
||
print("[MARITIME] Starting Global AIS Stream...")
|
||
while True:
|
||
try:
|
||
async with websockets.connect(AIS_STREAM_URL, ping_interval=20, ping_timeout=20) as ws:
|
||
await ws.send(json.dumps({
|
||
"APIKey": API_KEY,
|
||
"BoundingBoxes": [[[-90, -180], [90, 180]]],
|
||
"FilterMessageTypes": ["PositionReport"]
|
||
}))
|
||
msg_count = 0
|
||
async for message in ws:
|
||
msg = json.loads(message)
|
||
meta = msg.get("MetaData", {})
|
||
mmsi = meta.get("MMSI")
|
||
if not mmsi:
|
||
continue
|
||
async with _real_vessel_lock:
|
||
mid = str(mmsi)
|
||
if mid not in _real_vessel_cache:
|
||
_real_vessel_cache[mid] = {"id": mid, "source": "aisstream"}
|
||
v = _real_vessel_cache[mid]
|
||
v["last_seen"] = datetime.now(timezone.utc)
|
||
name = meta.get("ShipName", "").strip()
|
||
if name:
|
||
v["name"] = name
|
||
elif "name" not in v:
|
||
v["name"] = f"MMSI-{mmsi}"
|
||
ship_type = meta.get("ShipType", 0)
|
||
v["type"] = get_vessel_category(ship_type)
|
||
if msg["MessageType"] == "PositionReport":
|
||
pos = msg["Message"]["PositionReport"]
|
||
lat = pos.get("Latitude")
|
||
lon = pos.get("Longitude")
|
||
# Filter invalid coordinates (land-based noise, 0/0, out of range)
|
||
if (lat is None or lon is None
|
||
or abs(lat) > 90 or abs(lon) > 180
|
||
or (abs(lat) < 0.1 and abs(lon) < 0.1)):
|
||
continue
|
||
v["lat"] = round(lat, 5)
|
||
v["lon"] = round(lon, 5)
|
||
v["velocity"] = round(pos.get("Sog", 0) * 0.5144, 2)
|
||
v["heading"] = pos.get("Cog", 0)
|
||
msg_count += 1
|
||
if msg_count % 500 == 0:
|
||
print(f"[MARITIME] {len(_real_vessel_cache)} vessels cached")
|
||
except Exception as e:
|
||
print(f"[MARITIME] Stream error: {e}. Reconnecting in 10s...")
|
||
await asyncio.sleep(10)
|
||
|
||
|
||
async def ais_pruner():
|
||
while True:
|
||
await asyncio.sleep(60)
|
||
now = datetime.now(timezone.utc)
|
||
async with _real_vessel_lock:
|
||
stale = [k for k, v in _real_vessel_cache.items()
|
||
if (now - v["last_seen"]).total_seconds() > 1200]
|
||
for k in stale:
|
||
del _real_vessel_cache[k]
|
||
|
||
|
||
async def _fetch_digitraffic() -> list:
|
||
"""
|
||
Digitraffic AIS — free, no key, Baltic/European coverage.
|
||
Uses `from` parameter to get only vessels seen in the last 20 minutes.
|
||
Typically returns 300–700 active vessels with fresh positions.
|
||
"""
|
||
import time
|
||
try:
|
||
from_ms = int((time.time() - 1200) * 1000) # last 20 minutes
|
||
headers = {
|
||
"Accept": "application/json",
|
||
"Accept-Encoding": "gzip",
|
||
"Digitraffic-User": "GodsEye/3.0",
|
||
}
|
||
async with httpx.AsyncClient(timeout=15.0) as client:
|
||
r = await client.get(
|
||
f"https://meri.digitraffic.fi/api/ais/v1/locations?from={from_ms}",
|
||
headers=headers,
|
||
)
|
||
if r.status_code != 200:
|
||
print(f"[MARITIME] Digitraffic {r.status_code}")
|
||
return []
|
||
raw = r.content
|
||
try:
|
||
data = json.loads(gzip.decompress(raw))
|
||
except Exception:
|
||
data = r.json()
|
||
|
||
features = data.get("features", []) if isinstance(data, dict) else []
|
||
ships = []
|
||
for item in features:
|
||
geo = (item.get("geometry") or {}).get("coordinates", [])
|
||
if len(geo) < 2:
|
||
continue
|
||
lon, lat = geo[0], geo[1]
|
||
if abs(lat) > 90 or abs(lon) > 180 or (abs(lat) < 0.01 and abs(lon) < 0.01):
|
||
continue
|
||
props = item.get("properties", {})
|
||
mmsi = str(item.get("mmsi") or props.get("mmsi", ""))
|
||
sog = props.get("sog", 0) or 0
|
||
heading = props.get("heading", 0) or props.get("cog", 0) or 0
|
||
ships.append({
|
||
"id": f"dt_{mmsi}",
|
||
"name": f"MMSI-{mmsi}",
|
||
"lat": round(lat, 5),
|
||
"lon": round(lon, 5),
|
||
"velocity": round(sog * 0.5144, 2),
|
||
"heading": heading,
|
||
"type": "Cargo",
|
||
"source": "digitraffic",
|
||
})
|
||
print(f"[MARITIME] Digitraffic: {len(ships)} vessels (last 20min)")
|
||
return ships
|
||
except Exception as e:
|
||
print(f"[MARITIME] Digitraffic error: {e}")
|
||
return []
|
||
|
||
|
||
_dt_cache: list = []
|
||
_dt_cache_time: float = 0.0
|
||
_DT_TTL = 300.0 # refresh Digitraffic every 5 minutes
|
||
|
||
|
||
async def fetch_ships() -> list:
|
||
"""Merge AISStream (global, real-time) + Digitraffic (Baltic, 5-min cache)."""
|
||
import time
|
||
global _dt_cache, _dt_cache_time
|
||
|
||
# Refresh Digitraffic cache if stale
|
||
if time.monotonic() - _dt_cache_time > _DT_TTL:
|
||
_dt_cache = await _fetch_digitraffic()
|
||
_dt_cache_time = time.monotonic()
|
||
|
||
async with _real_vessel_lock:
|
||
stream_ships = [
|
||
v for v in _real_vessel_cache.values()
|
||
if v.get("lat") is not None and v.get("lon") is not None
|
||
]
|
||
|
||
# Merge: AISStream takes priority, Digitraffic fills in the rest
|
||
if stream_ships:
|
||
stream_ids = {s["id"] for s in stream_ships}
|
||
dt_extra = [s for s in _dt_cache if s["id"] not in stream_ids]
|
||
return stream_ships + dt_extra
|
||
|
||
return _dt_cache
|